Author: gnodet
Date: Fri Oct 19 11:29:58 2007
New Revision: 586570
URL: http://svn.apache.org/viewvc?rev=586570&view=rev
Log:
SM-1110: ServiceMix is not sending a response back to the calling Service in an
In-Out Message Exchange
Added:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
- copied, changed from r579576,
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
Modified:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Modified:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=586570&r1=586569&r2=586570&view=diff
==============================================================================
---
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
(original)
+++
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
Fri Oct 19 11:29:58 2007
@@ -19,6 +19,8 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.logging.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -30,10 +32,19 @@
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.jbi.JBIException;
+import javax.jbi.management.MBeanNames;
+import javax.xml.namespace.QName;
+import javax.management.MBeanServer;
+import javax.naming.InitialContext;
+
+import org.w3c.dom.DocumentFragment;
+import org.w3c.dom.Document;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.jexl.Expression;
@@ -55,8 +66,8 @@
import org.apache.servicemix.jbi.resolver.URIResolver;
import org.apache.servicemix.jbi.util.MessageUtil;
import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
/**
* Represents a bean endpoint which consists of a together with a [EMAIL
PROTECTED] MethodInvocationStrategy}
@@ -75,7 +86,7 @@
private String beanClassName;
private MethodInvocationStrategy methodInvocationStrategy;
private org.apache.servicemix.expression.Expression correlationExpression;
-
+
private Map<String, Holder> exchanges = new ConcurrentHashMap<String,
Holder>();
private Map<Object, Request> requests = new ConcurrentHashMap<Object,
Request>();
private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
@@ -87,7 +98,7 @@
public BeanEndpoint(BeanComponent component, ServiceEndpoint
serviceEndpoint) {
super(component, serviceEndpoint);
- setApplicationContext(component.getApplicationContext());
+ this.applicationContext = component.getApplicationContext();
}
public void start() throws Exception {
@@ -201,7 +212,7 @@
throw new IllegalStateException("Unknown role: " +
exchange.getRole());
}
}
-
+
protected void onProviderExchange(MessageExchange exchange) throws
Exception {
Object corId = getCorrelation(exchange);
Request req = requests.get(corId);
@@ -225,16 +236,16 @@
// Exchange is finished
if (exchange.getStatus() == ExchangeStatus.DONE) {
return;
- }
// Exchange has been aborted with an exception
- else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
return;
- // Fault message
+ // Fault message
} else if (exchange.getFault() != null) {
// TODO: find a way to send it back to the bean before
setting the DONE status
done(exchange);
} else {
- MethodInvocation invocation =
getMethodInvocationStrategy().createInvocation(req.getBean(), getBeanInfo(),
exchange, this);
+ MethodInvocation invocation =
getMethodInvocationStrategy().createInvocation(
+ req.getBean(), getBeanInfo(), exchange, this);
if (invocation == null) {
throw new
UnknownMessageExchangeTypeException(exchange, this);
}
@@ -258,7 +269,7 @@
currentRequest.set(null);
}
}
-
+
protected void onConsumerExchange(MessageExchange exchange) throws
Exception {
Object corId = exchange.getExchangeId();
Request req = requests.remove(corId);
@@ -281,7 +292,7 @@
checkEndOfRequest(req, corId);
currentRequest.set(null);
}
-
+
protected Object getCorrelation(MessageExchange exchange) throws
MessagingException {
return getCorrelationExpression().evaluate(exchange,
exchange.getMessage("in"));
}
@@ -315,30 +326,32 @@
/**
* A strategy method to allow implementations to perform some custom JBI
based injection of the POJO
*
- * @param bean the bean to be injected
+ * @param pojo the bean to be injected
*/
- protected void injectBean(final Object bean) {
+ protected void injectBean(final Object pojo) {
+ final PojoContext ctx = new PojoContext();
+ final DeliveryChannel ch = ctx.channel;
// Inject fields
- ReflectionUtils.doWithFields(bean.getClass(), new
ReflectionUtils.FieldCallback() {
+ ReflectionUtils.doWithFields(pojo.getClass(), new
ReflectionUtils.FieldCallback() {
public void doWith(Field f) throws IllegalArgumentException,
IllegalAccessException {
ExchangeTarget et = f.getAnnotation(ExchangeTarget.class);
if (et != null) {
- ReflectionUtils.setField(f, bean, new
DestinationImpl(et.uri(), BeanEndpoint.this));
+ ReflectionUtils.setField(f, pojo, new
DestinationImpl(et.uri(), BeanEndpoint.this));
}
if (f.getAnnotation(Resource.class) != null) {
if (ComponentContext.class.isAssignableFrom(f.getType())) {
- ReflectionUtils.setField(f, bean, context);
+ ReflectionUtils.setField(f, pojo, ctx);
} else if
(DeliveryChannel.class.isAssignableFrom(f.getType())) {
- ReflectionUtils.setField(f, bean, channel);
+ ReflectionUtils.setField(f, pojo, ch);
}
}
}
});
}
-
+
protected void evaluateCallbacks(final Request req) {
- final Object bean = req.getBean();
- ReflectionUtils.doWithMethods(bean.getClass(), new
ReflectionUtils.MethodCallback() {
+ final Object pojo = req.getBean();
+ ReflectionUtils.doWithMethods(pojo.getClass(), new
ReflectionUtils.MethodCallback() {
@SuppressWarnings("unchecked")
public void doWith(Method method) throws IllegalArgumentException,
IllegalAccessException {
if (method.getAnnotation(Callback.class) != null) {
@@ -347,14 +360,15 @@
JexlContext jc = JexlHelper.createContext();
jc.getVars().put("this", bean);
Object r = e.evaluate(jc);
- if (r instanceof Boolean == false) {
+ if (!(r instanceof Boolean)) {
throw new RuntimeException("Expression did not
returned a boolean value but: " + r);
}
Boolean oldVal = req.getCallbacks().get(method);
Boolean newVal = (Boolean) r;
- if ((oldVal == null || oldVal == false) && newVal ==
true) {
+ if ((oldVal == null || !oldVal) && newVal) {
req.getCallbacks().put(method, newVal);
- Object o = method.invoke(bean, new Object[0]);
+ Object o = method.invoke(pojo, new Object[0]);
+ o.toString();
// TODO: handle return value and sent it as the
answer
}
} catch (Exception e) {
@@ -364,7 +378,7 @@
}
});
}
-
+
/**
* Used by POJOs acting as a consumer
* @param uri
@@ -376,7 +390,7 @@
InOut me = getExchangeFactory().createInOutExchange();
URIResolver.configureExchange(me,
getServiceUnit().getComponent().getComponentContext(), uri);
MessageUtil.transferTo(message, me, "in");
- final Holder h = new Holder();
+ final Holder h = new Holder();
requests.put(me.getExchangeId(), currentRequest.get());
exchanges.put(me.getExchangeId(), h);
BeanEndpoint.this.send(me);
@@ -385,7 +399,7 @@
throw new RuntimeException(e);
}
}
-
+
protected void checkEndOfRequest(Request request, Object corId) {
if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
ReflectionUtils.callLifecycleMethod(request.getBean(),
PreDestroy.class);
@@ -425,5 +439,138 @@
*/
public void
setCorrelationExpression(org.apache.servicemix.expression.Expression
correlationExpression) {
this.correlationExpression = correlationExpression;
+ }
+
+ protected class PojoContext implements ComponentContext {
+
+ private DeliveryChannel channel = new PojoChannel();
+
+ public ServiceEndpoint activateEndpoint(QName qName, String s) throws
JBIException {
+ return context.activateEndpoint(qName, s);
+ }
+
+ public void deactivateEndpoint(ServiceEndpoint serviceEndpoint) throws
JBIException {
+ context.deactivateEndpoint(serviceEndpoint);
+ }
+
+ public void registerExternalEndpoint(ServiceEndpoint serviceEndpoint)
throws JBIException {
+ context.registerExternalEndpoint(serviceEndpoint);
+ }
+
+ public void deregisterExternalEndpoint(ServiceEndpoint
serviceEndpoint) throws JBIException {
+ context.deregisterExternalEndpoint(serviceEndpoint);
+ }
+
+ public ServiceEndpoint resolveEndpointReference(DocumentFragment
documentFragment) {
+ return context.resolveEndpointReference(documentFragment);
+ }
+
+ public String getComponentName() {
+ return context.getComponentName();
+ }
+
+ public DeliveryChannel getDeliveryChannel() throws MessagingException {
+ return channel;
+ }
+
+ public ServiceEndpoint getEndpoint(QName qName, String s) {
+ return context.getEndpoint(qName, s);
+ }
+
+ public Document getEndpointDescriptor(ServiceEndpoint serviceEndpoint)
throws JBIException {
+ return context.getEndpointDescriptor(serviceEndpoint);
+ }
+
+ public ServiceEndpoint[] getEndpoints(QName qName) {
+ return context.getEndpoints(qName);
+ }
+
+ public ServiceEndpoint[] getEndpointsForService(QName qName) {
+ return context.getEndpointsForService(qName);
+ }
+
+ public ServiceEndpoint[] getExternalEndpoints(QName qName) {
+ return context.getExternalEndpoints(qName);
+ }
+
+ public ServiceEndpoint[] getExternalEndpointsForService(QName qName) {
+ return context.getExternalEndpointsForService(qName);
+ }
+
+ public String getInstallRoot() {
+ return context.getInstallRoot();
+ }
+
+ public Logger getLogger(String s, String s1) throws
MissingResourceException, JBIException {
+ return context.getLogger(s, s1);
+ }
+
+ public MBeanNames getMBeanNames() {
+ return context.getMBeanNames();
+ }
+
+ public MBeanServer getMBeanServer() {
+ return context.getMBeanServer();
+ }
+
+ public InitialContext getNamingContext() {
+ return context.getNamingContext();
+ }
+
+ public Object getTransactionManager() {
+ return context.getTransactionManager();
+ }
+
+ public String getWorkspaceRoot() {
+ return context.getWorkspaceRoot();
+ }
+ }
+
+ protected class PojoChannel implements DeliveryChannel {
+
+ public void close() throws MessagingException {
+ BeanEndpoint.this.channel.close();
+ }
+
+ public MessageExchangeFactory createExchangeFactory() {
+ return BeanEndpoint.this.channel.createExchangeFactory();
+ }
+
+ public MessageExchangeFactory createExchangeFactory(QName qName) {
+ return BeanEndpoint.this.channel.createExchangeFactory(qName);
+ }
+
+ public MessageExchangeFactory createExchangeFactoryForService(QName
qName) {
+ return
BeanEndpoint.this.channel.createExchangeFactoryForService(qName);
+ }
+
+ public MessageExchangeFactory createExchangeFactory(ServiceEndpoint
serviceEndpoint) {
+ return
BeanEndpoint.this.channel.createExchangeFactory(serviceEndpoint);
+ }
+
+ public MessageExchange accept() throws MessagingException {
+ return BeanEndpoint.this.channel.accept();
+ }
+
+ public MessageExchange accept(long l) throws MessagingException {
+ return BeanEndpoint.this.channel.accept(l);
+ }
+
+ public void send(MessageExchange messageExchange) throws
MessagingException {
+ if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
+ && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+ requests.put(messageExchange.getExchangeId(),
currentRequest.get());
+ }
+ BeanEndpoint.this.channel.send(messageExchange);
+ }
+
+ public boolean sendSync(MessageExchange messageExchange) throws
MessagingException {
+ return BeanEndpoint.this.channel.sendSync(messageExchange);
+ }
+
+ public boolean sendSync(MessageExchange messageExchange, long l)
throws MessagingException {
+ return BeanEndpoint.this.channel.sendSync(messageExchange, l);
+ }
+
}
}
Copied:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
(from r579576,
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java)
URL:
http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java?p2=incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java&p1=incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java&r1=579576&r2=586570&rev=586570&view=diff
==============================================================================
---
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
(original)
+++
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
Fri Oct 19 11:29:58 2007
@@ -24,7 +24,7 @@
import junit.framework.TestCase;
-import org.apache.servicemix.bean.beans.ConsumerBean;
+import org.apache.servicemix.bean.beans.ConsumerListener;
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.components.util.EchoComponent;
@@ -32,30 +32,27 @@
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
-public class ConsumerBeanTest extends TestCase {
+public class ConsumerListenerTest extends TestCase {
protected JBIContainer jbi;
-
+
protected void setUp() throws Exception {
jbi = new JBIContainer();
jbi.setEmbedded(true);
jbi.init();
}
-
+
public void test() throws Exception {
BeanComponent bc = new BeanComponent();
BeanEndpoint ep = new BeanEndpoint();
ep.setService(new QName("bean"));
ep.setEndpoint("endpoint");
- ep.setBeanType(ConsumerBean.class);
+ ep.setBeanType(ConsumerListener.class);
bc.setEndpoints(new BeanEndpoint[] { ep });
jbi.activateComponent(bc, "servicemix-bean");
-
- EchoComponent echo1 = new EchoComponent(new QName("urn", "service1"),
"endpoint");
- jbi.activateComponent(echo1, "echo1");
-
- EchoComponent echo2 = new EchoComponent(new QName("urn", "service2"),
"endpoint");
- jbi.activateComponent(echo2, "echo2");
+
+ EchoComponent echo = new EchoComponent(new QName("echo"), "endpoint");
+ jbi.activateComponent(echo, "echo");
jbi.start();
@@ -69,7 +66,7 @@
assertExchangeWorked(me);
client.done(me);
}
-
+
protected void assertExchangeWorked(MessageExchange me) throws Exception {
if (me.getStatus() == ExchangeStatus.ERROR) {
if (me.getError() != null) {
@@ -84,4 +81,4 @@
}
}
-}
+}
\ No newline at end of file
Modified:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java?rev=586570&r1=586569&r2=586570&view=diff
==============================================================================
---
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
(original)
+++
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Fri Oct 19 11:29:58 2007
@@ -16,15 +16,17 @@
*/
package org.apache.servicemix.bean.beans;
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.jbi.util.MessageUtil;
-
import javax.annotation.Resource;
import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.jbi.util.MessageUtil;
public class ConsumerListener implements MessageExchangeListener {
@@ -32,16 +34,28 @@
private DeliveryChannel channel;
public void onMessageExchange(MessageExchange exchange) throws
MessagingException {
- MessageExchangeFactory factory = channel.createExchangeFactory();
- InOut io = factory.createInOutExchange();
- try {
- MessageUtil.transferInToIn(exchange, io);
- }
- catch (MessagingException e) {
- throw e;
- }
- catch (Exception e) {
- throw new MessagingException(e);
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchange io = (MessageExchange)
exchange.getProperty("exchange");
+ MessageUtil.transferOutToOut(exchange, io);
+ io.setProperty("exchange", exchange);
+ channel.send(io);
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ MessageExchange io = (MessageExchange)
exchange.getProperty("exchange");
+ io.setStatus(ExchangeStatus.DONE);
+ channel.send(io);
+ }
+ } else {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchangeFactory factory =
channel.createExchangeFactory();
+ InOut io = factory.createInOutExchange();
+ MessageUtil.transferInToIn(exchange, io);
+ io.setService(new QName("echo"));
+ io.setProperty("exchange", exchange);
+ channel.send(io);
+ } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+ // Do nothing
+ }
}
}