|
I have an initial pass at a replacement for one-way
async support using a target invoker rather than an interceptor. I have been
able to test it in chianti and ported it to the latest trunk. For some reason
having to do with SCA SPI test failures, I can't successfully build the man
trunk yet so I haven't yet verified the target invoker there. In any case, in
the interest of having something go into the trunk sooner rather than later,
here's the patch.
Notice that the target invoker should be using a
monitor to flag an error during invoke on a separate thread. However, trying to
use a monitor I get the build error (in chianti) below. So, I have
commented out use of a monitor for now.
Running localwire.LocalWireTestCase
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.511 sec <<< FA ILURE! testMessage(localwire.LocalWireTestCase) Time elapsed: 0.501 sec <<< ERROR! org.apache.tuscany.spi.builder.BuilderConfigException: No builder registered for implementation [org.apache.tuscany.core.implementation.java.JavaImplementation] Context stack trace:
[TargetComponent]
at org.apache.tuscany.core.builder.BuilderRegistryImpl.build(BuilderRegi stryImpl.java:93) at org.apache.tuscany.core.implementation.system.builder.SystemComposite Builder.build(SystemCompositeBuilder.java:97) at org.apache.tuscany.core.builder.BuilderRegistryImpl.build(BuilderRegi stryImpl.java:99) at org.apache.tuscany.core.deployer.DeployerImpl.build(DeployerImpl.java :125) at org.apache.tuscany.core.deployer.DeployerImpl.deploy(DeployerImpl.jav a:91) at org.apache.tuscany.core.launcher.Launcher.bootApplication(Launcher.ja va:195) at org.apache.tuscany.test.SCATestCase.setUp(SCATestCase.java:40) at junit.framework.TestCase.runBare(TestCase.java:125) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:118) at junit.framework.TestSuite.runTest(TestSuite.java:208) at junit.framework.TestSuite.run(TestSuite.java:203) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at org.apache.maven.surefire.junit.JUnitTestSet.execute(JUnitTestSet.jav a:210) at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.executeTes tSet(AbstractDirectoryTestSuite.java:135) at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.execute(Ab stractDirectoryTestSuite.java:122) at org.apache.maven.surefire.Surefire.run(Surefire.java:129) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at org.apache.maven.surefire.booter.SurefireBooter.runSuitesInProcess(Su refireBooter.java:225) at org.apache.maven.surefire.booter.SurefireBooter.main(SurefireBooter.j ava:747) Results : Tests run: 1, Failures: 0, Errors: 1, Skipped: 0 |
Index:
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
===================================================================
---
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
(revision 423959)
+++
sca/samples/supplychain/src/test/java/supplychain/SupplyChainClientTestCase.java
(working copy)
@@ -40,6 +40,8 @@
System.out.println("Main thread " + Thread.currentThread());
customer.purchaseGoods();
+ System.out.println("Sleeping ...");
+ Thread.sleep(1000);
}
}
Index:
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
===================================================================
---
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
(revision 423959)
+++
sca/core/src/main/java/org/apache/tuscany/core/policy/async/AsyncPolicyBuilder.java
(working copy)
@@ -58,7 +58,7 @@
for (InboundInvocationChain chain :
wire.getInvocationChains().values()) {
// TODO fix this - it should be represented by the model and not
through an annotation
if (chain.getMethod().getAnnotation(OneWay.class) != null) {
- chain.addInterceptor(new AsyncInterceptor(workManager,
monitor));
+ // chain.addInterceptor(new AsyncInterceptor(workManager,
monitor));
}
}
}
Index:
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
===================================================================
---
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
(revision 423959)
+++
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaAtomicComponent.java
(working copy)
@@ -19,8 +19,10 @@
import java.lang.reflect.Method;
import org.apache.tuscany.core.implementation.PojoConfiguration;
+import org.apache.tuscany.spi.component.ComponentRuntimeException;
import org.apache.tuscany.spi.component.TargetException;
import org.apache.tuscany.spi.component.TargetNotFoundException;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
import org.apache.tuscany.spi.wire.InboundWire;
import org.apache.tuscany.spi.wire.TargetInvoker;
import org.apache.tuscany.spi.wire.OutboundWire;
@@ -28,6 +30,8 @@
import org.apache.tuscany.core.implementation.PojoAtomicComponent;
import org.apache.tuscany.core.injection.WireObjectFactory;
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
+import org.osoa.sca.annotations.OneWay;
/**
* The runtime instantiation of Java component implementations
@@ -36,6 +40,9 @@
*/
public class JavaAtomicComponent<T> extends PojoAtomicComponent<T> {
+ private WorkScheduler workScheduler;
+ private AsyncMonitor monitor;
+
public JavaAtomicComponent(String name, PojoConfiguration configuration) {
super(name, configuration);
this.scope = configuration.getScopeContainer().getScope();
@@ -51,6 +58,18 @@
return wireService.createProxy(wire);
}
+ // REVIEW using setWorkScheduler rather than ctor parameter to avoid
+ // every use of ctor to be affected
+ public void setWorkScheduler(WorkScheduler workScheduler) {
+ this.workScheduler = workScheduler;
+ }
+
+ // REVIEW using setMonitor rather than ctor parameter to avoid
+ // every use of ctor to be affected
+ public void setMonitor(AsyncMonitor monitor) {
+ this.monitor = monitor;
+ }
+
public T getServiceInstance() throws TargetException {
if (serviceInterfaces.size() == 0) {
return getTargetInstance();
@@ -62,7 +81,22 @@
}
public TargetInvoker createTargetInvoker(String serviceName, Method
operation) {
- return new JavaTargetInvoker(operation, this);
+ TargetInvoker targetInvoker = null;
+ if (operation.getAnnotation(OneWay.class) != null) {
+ if (workScheduler == null) {
+ // TODO Make sure appropriate exception is thrown
+ throw new ComponentRuntimeException("Need an instance of
workScheduler");
+ }
+ if (monitor == null) {
+ // TODO Make sure appropriate exception is thrown
+ // throw new ComponentRuntimeException("Need an instance of
monitor");
+ }
+ targetInvoker = new AsyncJavaTargetInvoker(operation, this,
workScheduler, monitor);
+ }
+ else {
+ targetInvoker = new JavaTargetInvoker(operation, this);
+ }
+ return targetInvoker;
}
protected ObjectFactory<?> createWireFactory(OutboundWire wire) {
Index:
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
===================================================================
---
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
(revision 0)
+++
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/AsyncJavaTargetInvoker.java
(revision 0)
@@ -0,0 +1,152 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as
applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuscany.core.implementation.java;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
+import org.apache.tuscany.spi.services.work.WorkScheduler;
+import org.apache.tuscany.spi.wire.InvocationRuntimeException;
+import org.apache.tuscany.spi.wire.Message;
+import org.apache.tuscany.spi.wire.MessageChannel;
+import org.apache.tuscany.spi.wire.TargetInvoker;
+import org.osoa.sca.CompositeContext;
+import org.osoa.sca.CurrentCompositeContext;
+import org.osoa.sca.SCA;
+import org.osoa.sca.ServiceRuntimeException;
+
+/**
+ *
+ *
+ * @version $Rev: 420514 $ $Date: 2006-07-10 08:53:18 -0400 (Mon, 10 Jul 2006)
$
+ */
+public class AsyncJavaTargetInvoker extends JavaTargetInvoker {
+
+ private static final ContextBinder BINDER = new ContextBinder();
+ private static final Message RESPONSE = new ImmutableMessage();
+
+ private WorkScheduler workScheduler;
+ private AsyncMonitor monitor;
+
+ /**
+ * Creates a new invoker
+ *
+ * @param operation the operation the invoker is associated with
+ * @param context the scope component the component is resolved in
+ * @param workManager used to spawn invocation work
+ */
+ public AsyncJavaTargetInvoker(Method operation, JavaAtomicComponent
context, WorkScheduler workScheduler, AsyncMonitor monitor) {
+ super(operation, context);
+ this.workScheduler = workScheduler;
+ this.monitor = monitor;
+ }
+
+ // Override invocation methods to defer invocation to work item
+ // Both methods return null to indicate asynchrony; result will
+ // be conveyed by callback
+ @Override
+ public Object invokeTarget(Object payload) throws
InvocationTargetException {
+ final CompositeContext currentContext =
CurrentCompositeContext.getContext();
+ final Object finalPayload = payload;
+ // Schedule the invocation of the next interceptor in a new Work
instance
+ try {
+ workScheduler.scheduleWork(new Runnable() {
+ public void run() {
+ CompositeContext oldContext =
CurrentCompositeContext.getContext();
+ try {
+
AsyncJavaTargetInvoker.BINDER.setContext(currentContext);
+ // REVIEW response must be null for one-way and
non-null for callback
+ Object response =
AsyncJavaTargetInvoker.super.invokeTarget(finalPayload);
+ } catch (Exception e) {
+ // REVIEW uncomment when it is available
+ // monitor.executionError(e);
+ } finally {
+ AsyncJavaTargetInvoker.BINDER.setContext(oldContext);
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new ServiceRuntimeException(e);
+ }
+ return RESPONSE;
+ }
+
+ public Message invoke(Message msg) throws InvocationRuntimeException {
+ // can't just call overriden invoke because it would bypass async
+ try {
+ Object resp = invokeTarget(msg.getBody());
+ return (Message)resp;
+ } catch (InvocationTargetException e) {
+ return null;
+ } catch (Throwable e) {
+ return null;
+ }
+ }
+
+ public AsyncJavaTargetInvoker clone() throws CloneNotSupportedException {
+ AsyncJavaTargetInvoker invoker = (AsyncJavaTargetInvoker)
super.clone();
+ invoker.workScheduler = this.workScheduler;
+ invoker.monitor = this.monitor;
+
+ return invoker;
+ }
+
+ private static class ContextBinder extends SCA {
+ public void setContext(CompositeContext context) {
+ setCompositeContext(context);
+ }
+
+ public void start() {
+ throw new AssertionError();
+ }
+
+ public void stop() {
+ throw new AssertionError();
+ }
+ }
+
+ /**
+ * A dummy message passed back on an invocation
+ */
+ private static class ImmutableMessage implements Message {
+
+ public Object getBody() {
+ return null;
+ }
+
+ public void setBody(Object body) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setTargetInvoker(TargetInvoker invoker) {
+ throw new UnsupportedOperationException();
+ }
+
+ public TargetInvoker getTargetInvoker() {
+ return null;
+ }
+
+ public MessageChannel getCallbackChannel() {
+ return null;
+ }
+
+ public Message getRelatedCallbackMessage() {
+ return null;
+ }
+ }
+}
Index:
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
===================================================================
---
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
(revision 423959)
+++
sca/core/src/main/java/org/apache/tuscany/core/implementation/java/JavaComponentBuilder.java
(working copy)
@@ -42,6 +42,7 @@
import org.apache.tuscany.core.implementation.PojoConfiguration;
import org.apache.tuscany.core.injection.MethodEventInvoker;
import org.apache.tuscany.core.injection.PojoObjectFactory;
+import org.apache.tuscany.core.policy.async.AsyncMonitor;
import org.apache.tuscany.core.wire.InboundInvocationChainImpl;
import org.apache.tuscany.core.wire.InboundWireImpl;
import org.apache.tuscany.core.wire.InvokerInterceptor;
@@ -55,6 +56,35 @@
*/
public class JavaComponentBuilder extends
ComponentBuilderExtension<JavaImplementation> {
+ // private PolicyBuilderRegistry builderRegistry;
+ // private AsyncMonitor monitor;
+
+ // REVIEW this seems to cause build problems
+ /*
+ @Init(eager = true)
+ public void init() {
+ // REVIEW is this needed?
+ // builderRegistry.registerTargetBuilder(INITIAL, this);
+ if (monitor == null) {
+ monitor = new NullMonitorFactory().getMonitor(AsyncMonitor.class);
+ }
+ }
+ */
+
+ /*
+ @Autowire
+ public void setBuilderRegistry(PolicyBuilderRegistry builderRegistry) {
+ this.builderRegistry = builderRegistry;
+ }
+ */
+
+ /*
+ @org.apache.tuscany.spi.annotation.Monitor
+ public void setMonitor(AsyncMonitor monitor) {
+ this.monitor = monitor;
+ }
+ */
+
@SuppressWarnings("unchecked")
public AtomicComponent<?> build(CompositeComponent<?> parent,
ComponentDefinition<JavaImplementation>
definition,
@@ -108,6 +138,9 @@
configuration.getConstructorParamNames().addAll(ctorDef.getInjectionNames());
JavaAtomicComponent component = new
JavaAtomicComponent(definition.getName(), configuration);
+ // REVIEW setting these (as opposed to using ctor parms) may not be
appropriate
+ component.setWorkScheduler(workScheduler);
+ // component.setMonitor(monitor);
// handle properties
for (JavaMappedProperty<?> property :
componentType.getProperties().values()) {--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
