Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (702 => 703)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -133,6 +133,7 @@
*/
public void setSourceContext(ComponentContextImpl sourceContext) {
this.sourceContext = sourceContext;
+ this.mirror.sourceContext = sourceContext;
}
/**
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java (702 => 703)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -375,16 +375,17 @@
}
boolean foundRoute = false;
- if (packet.getEndpoint() != null) {
+ // If we found a destination, or this is a reply
+ if (packet.getEndpoint() != null || exchange.getRole() == Role.CONSUMER) {
foundRoute = true;
flow.send(exchange);
}
if (exchange.getRole() == Role.PROVIDER) {
- foundRoute |= getSubscriptionManager().dispatchToSubscribers(exchange);
+ getSubscriptionManager().dispatchToSubscribers(exchange);
}
- if (!foundRoute){
+ if (!foundRoute) {
boolean throwException = true;
ActivationSpec activationSpec = exchange.getActivationSpec();
if (activationSpec != null) {
@@ -393,6 +394,12 @@
if (throwException) {
throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + serviceName + " and interface: "
+ interfaceName);
+ } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+ exchange.handleAccept();
+ ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
+ exchange.setDestinationId(ctx.getComponentNameSpace());
+ // TODO: this will fail if exchange is InOut
+ getSubscriptionManager().done(exchange);
}
}
}
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java (702 => 703)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -17,18 +17,25 @@
**/
package org.servicemix.jbi.nmr;
-import org.activemq.util.IdGenerator;
+import org.servicemix.JbiConstants;
+import org.servicemix.MessageExchangeListener;
+import org.servicemix.components.util.ComponentSupport;
import org.servicemix.jbi.framework.Registry;
-import org.servicemix.jbi.messaging.ExchangePacket;
-import org.servicemix.jbi.messaging.InOnlyImpl;
import org.servicemix.jbi.messaging.MessageExchangeImpl;
import org.servicemix.jbi.nmr.flow.Flow;
import org.servicemix.jbi.nmr.flow.FlowProvider;
import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
/**
* Handles publish/subscribe style messaging in the NMR.
@@ -36,12 +43,11 @@
*
* @version $Revision$
*/
-public class SubscriptionManager {
+public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener {
private Registry registry;
private String flowName;
private Flow flow;
- private IdGenerator idGenerator = new IdGenerator();
/**
* Initialize the SubscriptionManager
@@ -52,7 +58,8 @@
if (this.flow == null) {
this.flow = FlowProvider.getFlow(flowName);
}
- this.registry = registry;
+ this.registry = registry;
+ broker.getContainer().activateComponent(this, "#SubscriptionManager#");
}
/**
@@ -80,13 +87,22 @@
* @throws JBIException
*/
protected void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
- ExchangePacket packet = exchange.getPacket().copy();
- packet.setExchangeId(idGenerator.generateId());
- packet.setEndpoint(endpoint);
- packet.setDestinationId(endpoint.getComponentNameSpace());
- MessageExchangeImpl me = new InOnlyImpl(packet);
- me.handleSend(false);
- flow.send(me.getMirror());
+ DeliveryChannel channel = getDeliveryChannel();
+ InOnly me = channel.createExchangeFactory().createInOnlyExchange();
+ NormalizedMessage in = me.createMessage();
+ getMessageTransformer().transform(me, exchange.getInMessage(), in);
+ me.setInMessage(in);
+ me.setEndpoint(endpoint);
+ Set names = exchange.getPropertyNames();
+ for (Iterator iter = names.iterator(); iter.hasNext();) {
+ String name = (String) iter.next();
+ me.setProperty(name, exchange.getProperty(name));
+ }
+ if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
+ channel.sendSync(me);
+ } else {
+ channel.send(me);
+ }
}
public Flow getFlow() {
@@ -104,5 +120,10 @@
public void setFlowName(String flowName) {
this.flowName = flowName;
}
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ // We should only receive done exchanges from subscribers
+ // but we need that so that they can be dequeued
+ }
}
Modified: trunk/core/src/test/java/org/servicemix/examples/Sender.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/examples/Sender.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/examples/Sender.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -17,7 +17,7 @@
**/
package org.servicemix.examples;
-import javax.jbi.messaging.MessagingException;
+import javax.jbi.JBIException;
/**
* A simple interface to allow polymorphic access to sending, test components.
@@ -26,7 +26,7 @@
*/
public interface Sender {
- void sendMessages(int messageCount) throws MessagingException;
+ void sendMessages(int messageCount) throws JBIException;
- void sendMessages(int messageCount, boolean sync) throws MessagingException;
+ void sendMessages(int messageCount, boolean sync) throws JBIException;
}
Modified: trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
import javax.jbi.JBIException;
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
@@ -51,49 +52,40 @@
this.resolver = resolver;
}
- public void sendMessages(int messageCount) {
+ public void sendMessages(int messageCount) throws JBIException {
sendMessages(messageCount, false);
}
- public void sendMessages(int messageCount, boolean sync) {
- try {
- ComponentContext context = getContext();
+ public void sendMessages(int messageCount, boolean sync) throws JBIException {
+ ComponentContext context = getContext();
- for (int i = 0; i < messageCount; i++) {
- InOnly exchange = context.getDeliveryChannel().createExchangeFactory().createInOnlyExchange();
- NormalizedMessage message = exchange.createMessage();
+ for (int i = 0; i < messageCount; i++) {
+ InOnly exchange = context.getDeliveryChannel().createExchangeFactory().createInOnlyExchange();
+ NormalizedMessage message = exchange.createMessage();
- ServiceEndpoint destination = null;
- if (resolver != null) {
- destination = resolver.resolveEndpoint(getContext(), exchange, NullEndpointFilter.getInstance());
- }
- if (destination != null) {
- // lets explicitly specify the destination - otherwise
- // we'll let the container choose for us
- exchange.setEndpoint(destination);
- }
+ ServiceEndpoint destination = null;
+ if (resolver != null) {
+ destination = resolver.resolveEndpoint(getContext(), exchange, NullEndpointFilter.getInstance());
+ }
+ if (destination != null) {
+ // lets explicitly specify the destination - otherwise
+ // we'll let the container choose for us
+ exchange.setEndpoint(destination);
+ }
- exchange.setInMessage(message);
- // lets set the XML as a byte[], String or DOM etc
- String xml = "<s12:Envelope xmlns:s12='http://www.w3.org/2003/05/soap-envelope'><s12:Body> <foo>Hello!</foo> </s12:Body></s12:Envelope>";
- message.setContent(new StringSource(xml));
- if (sync) {
- context.getDeliveryChannel().sendSync(exchange, 4000);
- } else {
- context.getDeliveryChannel().send(exchange);
+ exchange.setInMessage(message);
+ // lets set the XML as a byte[], String or DOM etc
+ String xml = "<s12:Envelope xmlns:s12='http://www.w3.org/2003/05/soap-envelope'><s12:Body> <foo>Hello!</foo> </s12:Body></s12:Envelope>";
+ message.setContent(new StringSource(xml));
+ if (sync) {
+ boolean result = context.getDeliveryChannel().sendSync(exchange, 1000);
+ if (!result) {
+ throw new MessagingException("Message delivery using sendSync has timed out");
}
- //Thread.sleep(100);
+ } else {
+ context.getDeliveryChannel().send(exchange);
}
}
- catch (JBIException e) {
- e.printStackTrace();
- }
- /*
- catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- */
}
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -21,12 +21,15 @@
import org.servicemix.examples.Receiver;
import org.servicemix.examples.ReceiverComponent;
import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
import org.servicemix.jbi.container.ActivationSpec;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
+import javax.jbi.JBIException;
+
/**
* @version $Revision$
*/
@@ -64,7 +67,11 @@
tt.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
- sender.sendMessages(NUM_MESSAGES, syncSend);
+ try {
+ sender.sendMessages(NUM_MESSAGES, syncSend);
+ } catch (JBIException e) {
+ throw new RuntimeJBIException(e);
+ }
return null;
}
});
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -23,12 +23,14 @@
import org.servicemix.examples.Receiver;
import org.servicemix.examples.ReceiverComponent;
import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
import org.servicemix.jbi.container.ActivationSpec;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
+import javax.jbi.JBIException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
@@ -102,7 +104,11 @@
tt.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
- sender.sendMessages(numMessages, syncSend);
+ try {
+ sender.sendMessages(numMessages, syncSend);
+ } catch (JBIException e) {
+ throw new RuntimeJBIException(e);
+ }
return null;
}
});
Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
import org.servicemix.examples.Receiver;
import org.servicemix.examples.ReceiverComponent;
import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
import org.servicemix.jbi.container.ActivationSpec;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.nmr.flow.Flow;
@@ -35,6 +36,7 @@
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
+import javax.jbi.JBIException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@@ -108,7 +110,11 @@
tt.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
- sender.sendMessages(NUM_MESSAGES, syncSend);
+ try {
+ sender.sendMessages(NUM_MESSAGES, syncSend);
+ } catch (JBIException e) {
+ throw new RuntimeJBIException(e);
+ }
return null;
}
});
Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -64,12 +64,16 @@
if (subscriptionFlowName != null) {
container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(subscriptionFlowName));
}
- container.setCreateMBeanServer(true);
+ // TODO: check why the following line is enabled, there is
+ // a 5 seconds pause when Management stuff is initialized
+ //container.setCreateMBeanServer(true);
container.init();
container.start();
Sender sender = new SenderComponent();
- container.activateComponent(new ActivationSpec("sender", sender));
+ ActivationSpec senderActivationSpec = new ActivationSpec("sender", sender);
+ senderActivationSpec.setFailIfNoDestinationEndpoint(false);
+ container.activateComponent(senderActivationSpec);
ReceiverListener receiver1 = new ReceiverListener();
container.activateComponent(createReceiverAS("receiver1", receiver1));
@@ -79,7 +83,7 @@
sender.sendMessages(1);
- Thread.sleep(1000);
+ Thread.sleep(100);
assertFalse(receiver1.isPropertySetOnExchange());
assertFalse(receiver2.isPropertySetOnExchange());
@@ -98,7 +102,7 @@
return as;
}
- public class ReceiverListener extends ReceiverComponent {
+ public static class ReceiverListener extends ReceiverComponent {
private boolean propertySetOnExchange;
private boolean propertySetOnMessage;
Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -37,43 +37,71 @@
public class SubscriptionTest extends TestCase {
- public void testStNull() throws Exception {
- runTest("st", null);
- }
-
- public void testStSt() throws Exception {
- runTest("st", "st");
- }
-
- public void testStSeda() throws Exception {
- runTest("st", "seda");
- }
-
- public void testSedaNull() throws Exception {
- runTest("seda", null);
- }
-
- public void testSedaSt() throws Exception {
- runTest("seda", "st");
- }
-
- public void testSedaSeda() throws Exception {
- runTest("seda", "seda");
- }
-
- private void runTest(String flowName, String subscriptionFlowName) throws Exception {
+ public void testStNullAsync() throws Exception {
+ runTest("st", null, false);
+ }
+
+ public void testStStAsync() throws Exception {
+ runTest("st", "st", false);
+ }
+
+ public void testStSedaAsync() throws Exception {
+ runTest("st", "seda", false);
+ }
+
+ public void testSedaNullAsync() throws Exception {
+ runTest("seda", null, false);
+ }
+
+ public void testSedaStAsync() throws Exception {
+ runTest("seda", "st", false);
+ }
+
+ public void testSedaSedaAsync() throws Exception {
+ runTest("seda", "seda", false);
+ }
+
+ public void testStNullSync() throws Exception {
+ runTest("st", null, true);
+ }
+
+ public void testStStSync() throws Exception {
+ runTest("st", "st", true);
+ }
+
+ public void testStSedaSync() throws Exception {
+ runTest("st", "seda", true);
+ }
+
+ public void testSedaNullSync() throws Exception {
+ runTest("seda", null, true);
+ }
+
+ public void testSedaStSync() throws Exception {
+ runTest("seda", "st", true);
+ }
+
+ public void testSedaSedaSync() throws Exception {
+ runTest("seda", "seda", true);
+ }
+
+ private void runTest(String flowName, String subscriptionFlowName, boolean sync) throws Exception {
JBIContainer container = new JBIContainer();
try {
container.getBroker().setFlow(FlowProvider.getFlow(flowName));
if (subscriptionFlowName != null) {
container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(subscriptionFlowName));
}
- container.setCreateMBeanServer(true);
+ // TODO: check why the following line is enabled, there is
+ // a 5 seconds pause when Management stuff is initialized
+ //container.setCreateMBeanServer(true);
container.init();
container.start();
SenderListener sender = new SenderListener();
- container.activateComponent(new ActivationSpec("sender", sender));
+ ActivationSpec senderActivationSpec = new ActivationSpec("sender", sender);
+ senderActivationSpec.setFailIfNoDestinationEndpoint(false);
+ container.activateComponent(senderActivationSpec);
Receiver receiver1 = new ReceiverComponent();
container.activateComponent(createReceiverAS("receiver1", receiver1));
@@ -81,13 +109,13 @@
Receiver receiver2 = new ReceiverComponent();
container.activateComponent(createReceiverAS("receiver2", receiver2));
- sender.sendMessages(1);
+ sender.sendMessages(1, sync);
- Thread.sleep(1000);
+ Thread.sleep(100);
assertEquals(1, receiver1.getMessageList().getMessageCount());
assertEquals(1, receiver2.getMessageList().getMessageCount());
- assertEquals(2, sender.responses.size());
+ assertEquals(0, sender.responses.size());
} finally {
container.shutDown();
}
Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (702 => 703)
--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java 2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java 2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
import org.jencks.factory.TransactionManagerFactoryBean;
import org.servicemix.examples.ReceiverComponent;
import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
import org.servicemix.jbi.container.ActivationSpec;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
@@ -35,6 +36,7 @@
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
+import javax.jbi.JBIException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@@ -123,7 +125,11 @@
senderContainer.setAutoEnlistInTransaction(true);
tt.execute(new TransactionCallback() {
public Object doInTransaction(TransactionStatus status) {
- sender.sendMessages(NUM_MESSAGES);
+ try {
+ sender.sendMessages(NUM_MESSAGES);
+ } catch (JBIException e) {
+ throw new RuntimeJBIException(e);
+ }
return null;
}
});