Author: gnodet
Date: Sun Oct 15 12:57:56 2006
New Revision: 464261
URL: http://svn.apache.org/viewvc?view=rev&rev=464261
Log:
Add a @Correlation annotation (need improvement)
Handle @Operation with named operations
Change MessageExchangeListener beans so that they have full control over JBI mep
Added:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Correlation.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/DestinationImpl.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Holder.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ListenerBeanEndpointTest.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Removed:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeDriven.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ExchangeProcessorBeanEndpointTest.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ExchangeProcessorBean.java
Modified:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/AnnotatedBean.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ListenerBean.java
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/PlainBean.java
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring-no-endpoints.xml
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring.xml
Modified:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanComponent.java
Sun Oct 15 12:57:56 2006
@@ -16,11 +16,11 @@
*/
package org.apache.servicemix.bean;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.URISupport;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.Endpoint;
import org.apache.servicemix.common.ResolvedEndpoint;
+import org.apache.servicemix.jbi.util.IntrospectionSupport;
+import org.apache.servicemix.jbi.util.URISupport;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
@@ -112,6 +112,9 @@
}
Map map = URISupport.parseQuery(uri.getQuery());
+ if (endpoint.getBean() == null) {
+ endpoint.setBean(endpoint.createBean());
+ }
IntrospectionSupport.setProperties(endpoint.getBean(), map);
endpoint.activate();
Modified:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
Sun Oct 15 12:57:56 2006
@@ -18,22 +18,21 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import org.aopalliance.intercept.MethodInvocation;
@@ -44,12 +43,15 @@
import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.bean.support.BeanInfo;
import org.apache.servicemix.bean.support.DefaultMethodInvocationStrategy;
+import org.apache.servicemix.bean.support.DestinationImpl;
+import org.apache.servicemix.bean.support.Holder;
import org.apache.servicemix.bean.support.MethodInvocationStrategy;
import org.apache.servicemix.bean.support.ReflectionUtils;
+import org.apache.servicemix.bean.support.Request;
import org.apache.servicemix.common.EndpointComponentContext;
-import org.apache.servicemix.common.ExchangeProcessor;
import org.apache.servicemix.common.ProviderEndpoint;
-import org.apache.servicemix.jbi.FaultException;
+import org.apache.servicemix.expression.JAXPStringXPathExpression;
+import org.apache.servicemix.expression.PropertyExpression;
import org.apache.servicemix.jbi.resolver.URIResolver;
import org.apache.servicemix.jbi.util.MessageUtil;
import org.springframework.beans.BeansException;
@@ -62,16 +64,20 @@
*
* @version $Revision: $
* @org.apache.xbean.XBean element="endpoint"
+ *
+ * TODO: handle correlations to create / scope beans
*/
public class BeanEndpoint extends ProviderEndpoint implements BeanFactoryAware
{
private BeanFactory beanFactory;
private String beanName;
private Object bean;
private BeanInfo beanInfo;
+ private Class<?> beanType;
private MethodInvocationStrategy methodInvocationStrategy;
+ private org.apache.servicemix.expression.Expression correlationExpression;
private Map<String, Holder> exchanges = new ConcurrentHashMap<String,
Holder>();
- private Map<String, Request> requests = new ConcurrentHashMap<String,
Request>();
+ private Map<Object, Request> requests = new ConcurrentHashMap<Object,
Request>();
private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
private ComponentContext context;
private DeliveryChannel channel;
@@ -86,42 +92,28 @@
public void start() throws Exception {
super.start();
- context = new
EndpointComponentContext(getServiceUnit().getComponent().getComponentContext());
+ context = new EndpointComponentContext(this);
channel = context.getDeliveryChannel();
-
- if (getBean() == null) {
- throw new IllegalArgumentException("No 'bean' property set");
+ Object pojo = getBean();
+ if (pojo != null) {
+ injectBean(pojo);
+ ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
}
+ beanType = pojo != null ? pojo.getClass() : createBean().getClass();
if (getMethodInvocationStrategy() == null) {
throw new IllegalArgumentException("No 'methodInvocationStrategy'
property set");
}
-
- injectBean(getBean());
- // Call PostConstruct annotated methods
- ReflectionUtils.callLifecycleMethod(getBean(), PostConstruct.class);
-
- if (getBean() instanceof ExchangeProcessor) {
- ExchangeProcessor processor = (ExchangeProcessor) getBean();
- processor.start();
- }
}
public void stop() throws Exception {
super.stop();
-
- if (getBean() instanceof ExchangeProcessor) {
- ExchangeProcessor processor = (ExchangeProcessor) getBean();
- processor.stop();
+ Object pojo = getBean();
+ if (pojo != null) {
+ ReflectionUtils.callLifecycleMethod(pojo, PreDestroy.class);
}
-
- // TODO invoke the beans destroy methods for @PreDestroy
-
- // lets allow garbage collection to take place
- bean = null;
}
-
public BeanFactory getBeanFactory() {
return beanFactory;
}
@@ -139,9 +131,6 @@
}
public Object getBean() {
- if (bean == null) {
- bean = createBean();
- }
return bean;
}
@@ -152,7 +141,7 @@
public BeanInfo getBeanInfo() {
if (beanInfo == null) {
- beanInfo = new BeanInfo(getBean().getClass(),
getMethodInvocationStrategy());
+ beanInfo = new BeanInfo(beanType, getMethodInvocationStrategy());
}
return beanInfo;
}
@@ -173,9 +162,11 @@
}
+ @Override
public void process(MessageExchange exchange) throws Exception {
- if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ if (exchange.getRole() == Role.CONSUMER) {
onConsumerExchange(exchange);
+ // Find or create the request for this provider exchange
} else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
onProviderExchange(exchange);
} else {
@@ -184,58 +175,85 @@
}
protected void onProviderExchange(MessageExchange exchange) throws
Exception {
- // Exchange is finished
- if (exchange.getStatus() == ExchangeStatus.DONE) {
- return;
- }
- // Exchange has been aborted with an exception
- else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- return;
- // Fault message
- } else if (exchange.getFault() != null) {
- // TODO: find a way to send it back to the bean before setting the
DONE status
- done(exchange);
+ Object corId = getCorrelation(exchange);
+ Request req = requests.get(corId);
+ if (req == null) {
+ Object pojo = getBean();
+ if (pojo == null) {
+ pojo = createBean();
+ injectBean(pojo);
+ ReflectionUtils.callLifecycleMethod(bean, PostConstruct.class);
+ }
+ req = new Request(pojo, exchange);
+ requests.put(corId, req);
+ }
+ currentRequest.set(req);
+ // If the bean implements MessageExchangeListener,
+ // just call the method
+ if (req.getBean() instanceof MessageExchangeListener) {
+ ((MessageExchangeListener)
req.getBean()).onMessageExchange(exchange);
} else {
- onMessageExchange(exchange);
- done(exchange);
+ // Exchange is finished
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ }
+ // Exchange has been aborted with an exception
+ else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ // Fault message
+ } else if (exchange.getFault() != null) {
+ // TODO: find a way to send it back to the bean before setting
the DONE status
+ done(exchange);
+ } else {
+ MethodInvocation invocation =
getMethodInvocationStrategy().createInvocation(req.getBean(), getBeanInfo(),
exchange, this);
+ if (invocation == null) {
+ throw new UnknownMessageExchangeTypeException(exchange,
this);
+ }
+ try {
+ invocation.proceed();
+ } catch (Exception e) {
+ throw e;
+ } catch (Throwable throwable) {
+ throw new MethodInvocationFailedException(req.getBean(),
invocation, exchange, this, throwable);
+ }
+ if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ send(exchange);
+ }
+ if (exchange.getFault() == null && exchange.getMessage("out")
== null) {
+ // TODO: handle MEP correctly (DONE should only be sent
for InOnly)
+ done(exchange);
+ }
+ }
}
+ checkEndOfRequest(req);
+ currentRequest.set(null);
}
protected void onConsumerExchange(MessageExchange exchange) throws
Exception {
- Holder me = exchanges.get(exchange.getExchangeId());
- if (me == null) {
- throw new IllegalStateException("Consumer exchange not found");
+ Object corId = exchange.getExchangeId();
+ Request req = requests.remove(corId);
+ if (req == null) {
+ throw new IllegalStateException("Receiving unknown consumer
exchange: " + exchange);
}
- me.set(exchange);
- evaluateCallbacks(requests.remove(exchange.getExchangeId()));
- }
-
- protected void onMessageExchange(MessageExchange exchange) throws
Exception {
- Request req = new Request(getBean(), exchange);
- requests.put(exchange.getExchangeId(), req);
currentRequest.set(req);
- Object pojo = req.getBean();
- if (pojo instanceof MessageExchangeListener) {
- MessageExchangeListener listener = (MessageExchangeListener) pojo;
- listener.onMessageExchange(exchange);
- }
- else if (pojo instanceof ExchangeProcessor) {
- ExchangeProcessor processor = (ExchangeProcessor) pojo;
- processor.process(exchange);
- }
- else {
- MethodInvocation invocation =
getMethodInvocationStrategy().createInvocation(pojo, getBeanInfo(), exchange,
this);
- if (invocation == null) {
- throw new UnknownMessageExchangeTypeException(exchange, this);
- }
- try {
- invocation.proceed();
- } catch (Exception e) {
- throw e;
- } catch (Throwable throwable) {
- throw new MethodInvocationFailedException(pojo, invocation,
exchange, this, throwable);
+ // If the bean implements MessageExchangeListener,
+ // just call the method
+ if (req.getBean() instanceof MessageExchangeListener) {
+ ((MessageExchangeListener)
req.getBean()).onMessageExchange(exchange);
+ } else {
+ Holder me = exchanges.get(exchange.getExchangeId());
+ if (me == null) {
+ throw new IllegalStateException("Consumer exchange not found");
}
+ me.set(exchange);
+ evaluateCallbacks(req);
}
+ checkEndOfRequest(req);
+ currentRequest.set(null);
+ }
+
+ protected Object getCorrelation(MessageExchange exchange) throws
MessagingException {
+ return getCorrelationExpression().evaluate(exchange,
exchange.getMessage("in"));
}
protected Object createBean() {
@@ -264,7 +282,7 @@
public void doWith(Field f) throws IllegalArgumentException,
IllegalAccessException {
ExchangeTarget et = f.getAnnotation(ExchangeTarget.class);
if (et != null) {
- ReflectionUtils.setField(f, bean, new
DestinationImpl(et.uri()));
+ ReflectionUtils.setField(f, bean, new
DestinationImpl(et.uri(), BeanEndpoint.this));
}
if (f.getAnnotation(Resource.class) != null) {
if (ComponentContext.class.isAssignableFrom(f.getType())) {
@@ -277,7 +295,7 @@
});
}
- protected void evaluateCallbacks(Request req) {
+ protected void evaluateCallbacks(final Request req) {
final Object bean = req.getBean();
ReflectionUtils.doWithMethods(bean.getClass(), new
ReflectionUtils.MethodCallback() {
@SuppressWarnings("unchecked")
@@ -288,9 +306,15 @@
JexlContext jc = JexlHelper.createContext();
jc.getVars().put("this", bean);
Object r = e.evaluate(jc);
- if (r instanceof Boolean && ((Boolean)
r).booleanValue()) {
+ if (r instanceof Boolean == false) {
+ throw new RuntimeException("Expression did not
returned a boolean value but: " + r);
+ }
+ boolean oldVal = req.getCallbacks().get(method);
+ boolean newVal = (Boolean) r;
+ if (oldVal == false && newVal == true) {
+ req.getCallbacks().put(method, newVal);
Object o = method.invoke(bean, new Object[0]);
- // TODO
+ // TODO: handle return value and sent it as the
answer
}
} catch (Exception e) {
throw new RuntimeException("Unable to invoke
callback", e);
@@ -300,122 +324,66 @@
});
}
- public class DestinationImpl implements Destination {
-
- private final String uri;
-
- public DestinationImpl(String uri) {
- this.uri = uri;
- }
-
- public NormalizedMessage createMessage() {
- return new MessageUtil.NormalizedMessageImpl();
- }
-
- public Future<NormalizedMessage> send(NormalizedMessage message) {
- try {
- InOut me = getExchangeFactory().createInOutExchange();
- URIResolver.configureExchange(me,
getServiceUnit().getComponent().getComponentContext(), uri);
- MessageUtil.transferTo(message, me, "in");
- return send(me);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected Future<NormalizedMessage> send(final MessageExchange me)
throws Exception {
+ /**
+ * Used by POJOs acting as a consumer
+ * @param uri
+ * @param message
+ * @return
+ */
+ public Future<NormalizedMessage> send(String uri, NormalizedMessage
message) {
+ try {
+ InOut me = getExchangeFactory().createInOutExchange();
+ URIResolver.configureExchange(me,
getServiceUnit().getComponent().getComponentContext(), uri);
+ MessageUtil.transferTo(message, me, "in");
final Holder h = new Holder();
requests.put(me.getExchangeId(), currentRequest.get());
exchanges.put(me.getExchangeId(), h);
BeanEndpoint.this.send(me);
return h;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
- public static class Request {
- private Object bean;
- private MessageExchange exchange;
- private Set<String> sentExchanges = new HashSet<String>();
-
- public Request() {
- }
-
- public Request(Object bean, MessageExchange exchange) {
- this.bean = bean;
- this.exchange = exchange;
- }
-
- /**
- * @return the bean
- */
- public Object getBean() {
- return bean;
- }
- /**
- * @param bean the bean to set
- */
- public void setBean(Object bean) {
- this.bean = bean;
- }
- /**
- * @return the exchange
- */
- public MessageExchange getExchange() {
- return exchange;
- }
- /**
- * @param exchange the exchange to set
- */
- public void setExchange(MessageExchange exchange) {
- this.exchange = exchange;
- }
- /**
- * @param id the id of the exchange sent
- */
- public void addSentExchange(String id) {
- sentExchanges.add(id);
+ protected void checkEndOfRequest(Request request) {
+ if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
+ ReflectionUtils.callLifecycleMethod(request.getBean(),
PreDestroy.class);
+ request.setBean(null);
+ request.setExchange(null);
+ requests.remove(request);
}
}
-
- public static class Holder implements Future<NormalizedMessage> {
-
- private MessageExchange object;
- private boolean cancel;
-
- public synchronized NormalizedMessage get() throws
InterruptedException, ExecutionException {
- if (object == null) {
- wait();
- }
- return extract(object);
- }
- public synchronized NormalizedMessage get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
- if (object == null) {
- wait(unit.toMillis(timeout));
+
+ /**
+ * @return the correlationExpression
+ */
+ public org.apache.servicemix.expression.Expression
getCorrelationExpression() {
+ if (correlationExpression == null) {
+ // Find correlation expression
+ Correlation cor = beanType.getAnnotation(Correlation.class);
+ if (cor != null) {
+ if (cor.property() != null) {
+ correlationExpression = new
PropertyExpression(cor.property());
+ } else if (cor.xpath() != null) {
+ correlationExpression = new
JAXPStringXPathExpression(cor.xpath());
+ }
}
- return extract(object);
- }
- public synchronized void set(MessageExchange t) {
- object = t;
- notifyAll();
- }
- public boolean cancel(boolean mayInterruptIfRunning) {
- cancel = true;
- return false;
- }
- public boolean isCancelled() {
- return cancel;
- }
- public boolean isDone() {
- return object != null;
- }
- protected NormalizedMessage extract(MessageExchange me) throws
ExecutionException {
- if (me.getStatus() == ExchangeStatus.ERROR) {
- throw new ExecutionException(me.getError());
- } else if (me.getFault() != null) {
- throw new ExecutionException(new FaultException("Fault
occured", me, me.getFault()));
- } else {
- return me.getMessage("out");
+ if (correlationExpression == null) {
+ correlationExpression = new
org.apache.servicemix.expression.Expression() {
+ public Object evaluate(MessageExchange exchange,
NormalizedMessage message) throws MessagingException {
+ return exchange.getExchangeId();
+ }
+ };
}
}
+ return correlationExpression;
}
+
+ /**
+ * @param correlationExpression the correlationExpression to set
+ */
+ public void
setCorrelationExpression(org.apache.servicemix.expression.Expression
correlationExpression) {
+ this.correlationExpression = correlationExpression;
+ }
+
}
Added:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Correlation.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Correlation.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Correlation.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Correlation.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,16 @@
+package org.apache.servicemix.bean;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
[EMAIL PROTECTED](RetentionPolicy.RUNTIME)
[EMAIL PROTECTED]({ElementType.TYPE})
+public @interface Correlation {
+
+ String property() default "";
+
+ String xpath() default "";
+
+}
Modified:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
Sun Oct 15 12:57:56 2006
@@ -26,6 +26,16 @@
@Target( { METHOD })
public @interface Operation {
+ enum MEP {
+ InOnly,
+ RobustInOnly,
+ InOut,
+ InOptionalOut,
+ Default,
+ }
+
String name() default "";
+ MEP mep() default MEP.Default;
+
}
Modified:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java
Sun Oct 15 12:57:56 2006
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.bean.Content;
+import org.apache.servicemix.bean.Operation;
import org.apache.servicemix.bean.Property;
import org.apache.servicemix.bean.XPath;
import org.apache.servicemix.components.util.MessageHelper;
@@ -110,8 +111,12 @@
}
// now lets add the method to the repository
+ String opName = method.getName();
+ if (method.getAnnotation(Operation.class) != null) {
+ opName = method.getAnnotation(Operation.class).name();
+ }
Expression parametersExpression =
createMethodParametersExpression(parameterExpressions);
- operations.put(method.getName(), new MethodInfo(type, method,
parametersExpression));
+ operations.put(opName, new MethodInfo(type, method,
parametersExpression));
}
protected Expression createMethodParametersExpression(final Expression[]
parameterExpressions) {
@@ -140,7 +145,6 @@
return answer;
}
}
-
return strategy.getDefaultParameterTypeExpression(parameterType);
}
Added:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/DestinationImpl.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/DestinationImpl.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/DestinationImpl.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/DestinationImpl.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicemix.bean.support;
+
+import java.util.concurrent.Future;
+
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.bean.BeanEndpoint;
+import org.apache.servicemix.bean.Destination;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public class DestinationImpl implements Destination {
+
+ private final BeanEndpoint endpoint;
+ private final String uri;
+
+ public DestinationImpl(String uri, BeanEndpoint endpoint) {
+ this.uri = uri;
+ this.endpoint = endpoint;
+ }
+
+ public NormalizedMessage createMessage() {
+ return new MessageUtil.NormalizedMessageImpl();
+ }
+
+ public Future<NormalizedMessage> send(NormalizedMessage message) {
+ return endpoint.send(uri, message);
+ }
+
+}
\ No newline at end of file
Added:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Holder.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Holder.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Holder.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Holder.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicemix.bean.support;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.jbi.FaultException;
+
+public class Holder implements Future<NormalizedMessage> {
+
+ private MessageExchange object;
+ private boolean cancel;
+
+ public synchronized NormalizedMessage get() throws InterruptedException,
ExecutionException {
+ if (object == null) {
+ wait();
+ }
+ return extract(object);
+ }
+ public synchronized NormalizedMessage get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException {
+ if (object == null) {
+ wait(unit.toMillis(timeout));
+ }
+ return extract(object);
+ }
+ public synchronized void set(MessageExchange t) {
+ object = t;
+ notifyAll();
+ }
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ cancel = true;
+ return false;
+ }
+ public boolean isCancelled() {
+ return cancel;
+ }
+ public boolean isDone() {
+ return object != null;
+ }
+ protected NormalizedMessage extract(MessageExchange me) throws
ExecutionException {
+ if (me.getStatus() == ExchangeStatus.ERROR) {
+ throw new ExecutionException(me.getError());
+ } else if (me.getFault() != null) {
+ throw new ExecutionException(new FaultException("Fault occured",
me, me.getFault()));
+ } else {
+ return me.getMessage("out");
+ }
+ }
+}
\ No newline at end of file
Added:
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/Request.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicemix.bean.support;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jbi.messaging.MessageExchange;
+
+public class Request {
+ private Object bean;
+ private MessageExchange exchange;
+ private Set<String> sentExchanges = new HashSet<String>();
+ // Keep track of callbacks already called, so that the same callback
+ // can not be called twice
+ private Map<Method, Boolean> callbacks = new HashMap<Method, Boolean>();
+
+ public Request() {
+ }
+
+ public Request(Object bean, MessageExchange exchange) {
+ this.bean = bean;
+ this.exchange = exchange;
+ }
+
+ /**
+ * @return the bean
+ */
+ public Object getBean() {
+ return bean;
+ }
+ /**
+ * @param bean the bean to set
+ */
+ public void setBean(Object bean) {
+ this.bean = bean;
+ }
+ /**
+ * @return the exchange
+ */
+ public MessageExchange getExchange() {
+ return exchange;
+ }
+ /**
+ * @param exchange the exchange to set
+ */
+ public void setExchange(MessageExchange exchange) {
+ this.exchange = exchange;
+ }
+ /**
+ * @param id the id of the exchange sent
+ */
+ public void addSentExchange(String id) {
+ sentExchanges.add(id);
+ }
+
+ /**
+ * @return the callbacks
+ */
+ public Map<Method, Boolean> getCallbacks() {
+ return callbacks;
+ }
+
+}
\ No newline at end of file
Added:
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ListenerBeanEndpointTest.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ListenerBeanEndpointTest.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ListenerBeanEndpointTest.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ListenerBeanEndpointTest.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import org.apache.servicemix.bean.beans.ListenerBean;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.resolver.URIResolver;
+import org.apache.servicemix.tck.SpringTestSupport;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.w3c.dom.DocumentFragment;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.servicedesc.ServiceEndpoint;
+
+public class ListenerBeanEndpointTest extends SpringTestSupport {
+
+ public void
testSendingToDynamicEndpointForExchangeProcessorBeanWithFooOperation() throws
Exception {
+ // now lets make a request on this endpoint
+ DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
+
+ DocumentFragment epr = URIResolver.createWSAEPR("bean:listenerBean");
+ ServiceEndpoint se = client.getContext().resolveEndpointReference(epr);
+ assertNotNull("We should find a service endpoint!", se);
+
+ InOnly exchange = client.createInOnlyExchange();
+ exchange.setEndpoint(se);
+ exchange.getInMessage().setContent(new
StringSource("<hello>world</hello>"));
+ client.sendSync(exchange);
+
+ assertExchangeWorked(exchange);
+
+ ListenerBean bean = (ListenerBean) getBean("listenerBean");
+ MessageExchange answer = bean.getLastExchange();
+
+ log.info("Bean's process() method has been invoked: " + answer);
+
+ assertNotNull("Bean's process() method should bave been invoked",
answer);
+ }
+
+ protected void assertExchangeWorked(MessageExchange me) throws Exception {
+ if (me.getStatus() == ExchangeStatus.ERROR) {
+ if (me.getError() != null) {
+ throw me.getError();
+ }
+ else {
+ fail("Received ERROR status");
+ }
+ }
+ else if (me.getFault() != null) {
+ fail("Received fault: " + new
SourceTransformer().toString(me.getFault().getContent()));
+ }
+ }
+
+ protected AbstractXmlApplicationContext createBeanFactory() {
+ return new ClassPathXmlApplicationContext("spring-no-endpoints.xml");
+ }
+
+}
Modified:
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/AnnotatedBean.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/AnnotatedBean.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/AnnotatedBean.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/AnnotatedBean.java
Sun Oct 15 12:57:56 2006
@@ -16,11 +16,11 @@
*/
package org.apache.servicemix.bean.beans;
+import javax.jbi.messaging.MessageExchange;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.bean.ExchangeDriven;
-
-import javax.jbi.messaging.MessageExchange;
+import org.apache.servicemix.bean.Operation;
/**
* A simple POJO which uses an annotation to indicate which method should be
driven by a message exchange
@@ -33,7 +33,7 @@
private MessageExchange myExchangeMethod;
- @ExchangeDriven
+ @Operation(name = "foo")
public void myExchangeMethod(MessageExchange messageExchange) {
this.myExchangeMethod = messageExchange;
Added:
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java?view=auto&rev=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
(added)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Sun Oct 15 12:57:56 2006
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.beans;
+
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public class ConsumerListener implements MessageExchangeListener {
+
+ @Resource
+ private DeliveryChannel channel;
+
+ public void onMessageExchange(MessageExchange exchange) throws
MessagingException {
+ MessageExchangeFactory factory = channel.createExchangeFactory();
+ InOut io = factory.createInOutExchange();
+ NormalizedMessage nm = io.createMessage();
+ MessageUtil.transferInToIn(exchange, io);
+ }
+
+}
Modified:
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ListenerBean.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ListenerBean.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ListenerBean.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ListenerBean.java
Sun Oct 15 12:57:56 2006
@@ -20,6 +20,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.MessageExchangeListener;
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
@@ -32,19 +35,22 @@
private static final Log log = LogFactory.getLog(ListenerBean.class);
+ @Resource
+ private DeliveryChannel channel;
+
private MessageExchange lastExchange;
private String param;
- public void onMessageExchange(MessageExchange messageExchange) throws
MessagingException {
- this.lastExchange = messageExchange;
-
- log.info("Received exchange: " + messageExchange);
+ public void onMessageExchange(MessageExchange exchange) throws
MessagingException {
+ this.lastExchange = exchange;
+ log.info("Received exchange: " + exchange);
+ exchange.setStatus(ExchangeStatus.DONE);
+ channel.send(exchange);
}
public MessageExchange getLastExchange() {
return lastExchange;
}
-
public String getParam() {
return param;
Modified:
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/PlainBean.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/PlainBean.java?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/PlainBean.java
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/PlainBean.java
Sun Oct 15 12:57:56 2006
@@ -54,7 +54,6 @@
log.info("methodWithPropertyParameter() called with parameter: " +
name);
}
-
public void methodWithPropertyParameterAndXPath(@Property(name = "person")
String name, @XPath(xpath="/hello/@address") String address) {
this.propertyParameter = name;
this.xpathParameter = address;
Modified:
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring-no-endpoints.xml
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring-no-endpoints.xml?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
---
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring-no-endpoints.xml
(original)
+++
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring-no-endpoints.xml
Sun Oct 15 12:57:56 2006
@@ -34,9 +34,9 @@
</sm:activationSpecs>
</sm:container>
- <bean id="exchangeProcessorBean"
class="org.apache.servicemix.bean.beans.ExchangeProcessorBean"/>
- <bean id="listenerBean"
class="org.apache.servicemix.bean.beans.ListenerBean"/>
- <bean id="annotationsBean"
class="org.apache.servicemix.bean.beans.AnnotatedBean"/>
+ <bean id="consumerBean"
class="org.apache.servicemix.bean.beans.ConsumerBean"/>
+ <bean id="listenerBean"
class="org.apache.servicemix.bean.beans.ListenerBean"/>
+ <bean id="annotatedBean"
class="org.apache.servicemix.bean.beans.AnnotatedBean"/>
<bean id="plainBean" class="org.apache.servicemix.bean.beans.PlainBean"/>
</beans>
Modified:
incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring.xml
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring.xml?view=diff&rev=464261&r1=464260&r2=464261
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring.xml
(original)
+++ incubator/servicemix/trunk/servicemix-bean/src/test/resources/spring.xml
Sun Oct 15 12:57:56 2006
@@ -39,6 +39,6 @@
</sm:container>
<bean id="listenerBean"
class="org.apache.servicemix.bean.beans.ListenerBean"/>
- <bean id="annotationsBean"
class="org.apache.servicemix.bean.beans.AnnotatedBean"/>
+ <bean id="annotatedBean"
class="org.apache.servicemix.bean.beans.AnnotatedBean"/>
</beans>