Title: [703] trunk/core/src/test/java/org/servicemix/jbi/nmr: SM-86: sendSync does not work on pub/sub

Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (702 => 703)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -133,6 +133,7 @@
      */
     public void setSourceContext(ComponentContextImpl sourceContext) {
         this.sourceContext = sourceContext;
+        this.mirror.sourceContext = sourceContext;
     }
 
     /**

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java (702 => 703)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/Broker.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -375,16 +375,17 @@
         }
 
         boolean foundRoute = false;
-        if (packet.getEndpoint() != null) {
+        // If we found a destination, or this is a reply
+        if (packet.getEndpoint() != null || exchange.getRole() == Role.CONSUMER) {
             foundRoute = true;
             flow.send(exchange);
         }
 
         if (exchange.getRole() == Role.PROVIDER) {
-        	foundRoute |= getSubscriptionManager().dispatchToSubscribers(exchange);
+        	getSubscriptionManager().dispatchToSubscribers(exchange);
         }
         
-        if (!foundRoute){
+        if (!foundRoute) {
             boolean throwException = true;
             ActivationSpec activationSpec = exchange.getActivationSpec();
             if (activationSpec != null) {
@@ -393,6 +394,12 @@
             if (throwException) {
                 throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + serviceName + " and interface: "
                         + interfaceName);
+            } else if (exchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+                exchange.handleAccept();
+                ComponentContextImpl ctx = (ComponentContextImpl) getSubscriptionManager().getContext();
+                exchange.setDestinationId(ctx.getComponentNameSpace());
+                // TODO: this will fail if exchange is InOut
+                getSubscriptionManager().done(exchange);
             }
         }
     }

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java (702 => 703)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/SubscriptionManager.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -17,18 +17,25 @@
  **/
 package org.servicemix.jbi.nmr;
 
-import org.activemq.util.IdGenerator;
+import org.servicemix.JbiConstants;
+import org.servicemix.MessageExchangeListener;
+import org.servicemix.components.util.ComponentSupport;
 import org.servicemix.jbi.framework.Registry;
-import org.servicemix.jbi.messaging.ExchangePacket;
-import org.servicemix.jbi.messaging.InOnlyImpl;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.nmr.flow.Flow;
 import org.servicemix.jbi.nmr.flow.FlowProvider;
 import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Handles publish/subscribe style messaging in the NMR.
@@ -36,12 +43,11 @@
  * 
  * @version $Revision$
  */
-public class SubscriptionManager {
+public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener {
     
     private Registry registry;
     private String flowName;
     private Flow flow;
-    private IdGenerator idGenerator = new IdGenerator();
 
     /**
      * Initialize the SubscriptionManager
@@ -52,7 +58,8 @@
         if (this.flow == null) {
             this.flow = FlowProvider.getFlow(flowName);
         }
-        this.registry = registry;        
+        this.registry = registry; 
+        broker.getContainer().activateComponent(this, "#SubscriptionManager#");
     }
 
     /**
@@ -80,13 +87,22 @@
      * @throws JBIException 
      */
     protected void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
-        ExchangePacket packet = exchange.getPacket().copy();
-        packet.setExchangeId(idGenerator.generateId());
-        packet.setEndpoint(endpoint);
-        packet.setDestinationId(endpoint.getComponentNameSpace());
-        MessageExchangeImpl me = new InOnlyImpl(packet);
-        me.handleSend(false);
-        flow.send(me.getMirror());
+        DeliveryChannel channel = getDeliveryChannel();
+        InOnly me = channel.createExchangeFactory().createInOnlyExchange();
+        NormalizedMessage in = me.createMessage();
+        getMessageTransformer().transform(me, exchange.getInMessage(), in);
+        me.setInMessage(in);
+        me.setEndpoint(endpoint);
+        Set names = exchange.getPropertyNames();
+        for (Iterator iter = names.iterator(); iter.hasNext();) {
+            String name = (String) iter.next();
+            me.setProperty(name, exchange.getProperty(name));
+        }
+        if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
+            channel.sendSync(me);
+        } else {
+            channel.send(me);
+        }
     }
 
 	public Flow getFlow() {
@@ -104,5 +120,10 @@
 	public void setFlowName(String flowName) {
 		this.flowName = flowName;
 	}
+
+    public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+        // We should only receive done exchanges from subscribers
+        // but we need that so that they can be dequeued
+    }
    
 }

Modified: trunk/core/src/test/java/org/servicemix/examples/Sender.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/examples/Sender.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/examples/Sender.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -17,7 +17,7 @@
  **/
 package org.servicemix.examples;
 
-import javax.jbi.messaging.MessagingException;
+import javax.jbi.JBIException;
 
 /**
  * A simple interface to allow polymorphic access to sending, test components.
@@ -26,7 +26,7 @@
  */
 public interface Sender {
 
-    void sendMessages(int messageCount) throws MessagingException;
+    void sendMessages(int messageCount) throws JBIException;
     
-    void sendMessages(int messageCount, boolean sync) throws MessagingException;
+    void sendMessages(int messageCount, boolean sync) throws JBIException;
 }

Modified: trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/examples/SenderComponent.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
 import javax.jbi.JBIException;
 import javax.jbi.component.ComponentContext;
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
@@ -51,49 +52,40 @@
         this.resolver = resolver;
     }
 
-    public void sendMessages(int messageCount)  {
+    public void sendMessages(int messageCount) throws JBIException {
     	sendMessages(messageCount, false);
     }
         
-    public void sendMessages(int messageCount, boolean sync)  {
-        try {
-            ComponentContext context = getContext();
+    public void sendMessages(int messageCount, boolean sync) throws JBIException {
+        ComponentContext context = getContext();
 
-            for (int i = 0; i < messageCount; i++) {
-                InOnly exchange = context.getDeliveryChannel().createExchangeFactory().createInOnlyExchange();
-                NormalizedMessage message = exchange.createMessage();
+        for (int i = 0; i < messageCount; i++) {
+            InOnly exchange = context.getDeliveryChannel().createExchangeFactory().createInOnlyExchange();
+            NormalizedMessage message = exchange.createMessage();
 
-                ServiceEndpoint destination = null;
-                if (resolver != null) {
-                    destination = resolver.resolveEndpoint(getContext(), exchange, NullEndpointFilter.getInstance());
-                }
-                if (destination != null) {
-                    // lets explicitly specify the destination - otherwise
-                    // we'll let the container choose for us
-                    exchange.setEndpoint(destination);
-                }
+            ServiceEndpoint destination = null;
+            if (resolver != null) {
+                destination = resolver.resolveEndpoint(getContext(), exchange, NullEndpointFilter.getInstance());
+            }
+            if (destination != null) {
+                // lets explicitly specify the destination - otherwise
+                // we'll let the container choose for us
+                exchange.setEndpoint(destination);
+            }
 
-                exchange.setInMessage(message);
-                // lets set the XML as a byte[], String or DOM etc
-                String xml = "<s12:Envelope xmlns:s12='http://www.w3.org/2003/05/soap-envelope'><s12:Body> <foo>Hello!</foo> </s12:Body></s12:Envelope>";
-                message.setContent(new StringSource(xml));
-                if (sync) {
-                	context.getDeliveryChannel().sendSync(exchange, 4000);
-                } else {
-                	context.getDeliveryChannel().send(exchange);
+            exchange.setInMessage(message);
+            // lets set the XML as a byte[], String or DOM etc
+            String xml = "<s12:Envelope xmlns:s12='http://www.w3.org/2003/05/soap-envelope'><s12:Body> <foo>Hello!</foo> </s12:Body></s12:Envelope>";
+            message.setContent(new StringSource(xml));
+            if (sync) {
+            	boolean result = context.getDeliveryChannel().sendSync(exchange, 1000);
+                if (!result) {
+                    throw new MessagingException("Message delivery using sendSync has timed out");
                 }
-                //Thread.sleep(100);
+            } else {
+            	context.getDeliveryChannel().send(exchange);
             }
         }
-        catch (JBIException e) {
-            e.printStackTrace();
-        }
-        /*
-        catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        */
     }
 
 }

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -21,12 +21,15 @@
 import org.servicemix.examples.Receiver;
 import org.servicemix.examples.ReceiverComponent;
 import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 
+import javax.jbi.JBIException;
+
 /**
  * @version $Revision$
  */
@@ -64,7 +67,11 @@
         
         tt.execute(new TransactionCallback() {
 	  		public Object doInTransaction(TransactionStatus status) {
-	  			sender.sendMessages(NUM_MESSAGES, syncSend);
+                try {
+                    sender.sendMessages(NUM_MESSAGES, syncSend);
+                } catch (JBIException e) {
+                    throw new RuntimeJBIException(e);
+                }
 	  			return null;
 	  		}
         });

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -23,12 +23,14 @@
 import org.servicemix.examples.Receiver;
 import org.servicemix.examples.ReceiverComponent;
 import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallback;
 
+import javax.jbi.JBIException;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
@@ -102,7 +104,11 @@
         
         tt.execute(new TransactionCallback() {
 	  		public Object doInTransaction(TransactionStatus status) {
-	  			sender.sendMessages(numMessages, syncSend);
+                try {
+                    sender.sendMessages(numMessages, syncSend);
+                } catch (JBIException e) {
+                    throw new RuntimeJBIException(e);
+                }
 	  			return null;
 	  		}
         });

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractTransactionTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
 import org.servicemix.examples.Receiver;
 import org.servicemix.examples.ReceiverComponent;
 import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.nmr.flow.Flow;
@@ -35,6 +36,7 @@
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
+import javax.jbi.JBIException;
 import javax.transaction.TransactionManager;
 import javax.transaction.UserTransaction;
 
@@ -108,7 +110,11 @@
         
         tt.execute(new TransactionCallback() {
 	  		public Object doInTransaction(TransactionStatus status) {
-	  			sender.sendMessages(NUM_MESSAGES, syncSend);
+                try {
+                    sender.sendMessages(NUM_MESSAGES, syncSend);
+                } catch (JBIException e) {
+                    throw new RuntimeJBIException(e);
+                }
 	  			return null;
 	  		}
         });

Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionPropertyCopyTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -64,12 +64,16 @@
 			if (subscriptionFlowName != null) {
 				container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(subscriptionFlowName));
 			}
-			container.setCreateMBeanServer(true);
+            // TODO: check why the following line is enabled, there is 
+            // a 5 seconds pause when Management stuff is initialized
+			//container.setCreateMBeanServer(true);
 			container.init();
 			container.start();
 			
 			Sender sender = new SenderComponent();
-			container.activateComponent(new ActivationSpec("sender", sender));
+            ActivationSpec senderActivationSpec = new ActivationSpec("sender", sender);
+            senderActivationSpec.setFailIfNoDestinationEndpoint(false);
+            container.activateComponent(senderActivationSpec);
 			
 			ReceiverListener receiver1 = new ReceiverListener();
 			container.activateComponent(createReceiverAS("receiver1", receiver1));
@@ -79,7 +83,7 @@
 			
 			sender.sendMessages(1);
 			
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			
             assertFalse(receiver1.isPropertySetOnExchange());
             assertFalse(receiver2.isPropertySetOnExchange());
@@ -98,7 +102,7 @@
 		return as;
 	}
 	
-	public class ReceiverListener extends ReceiverComponent {
+	public static class ReceiverListener extends ReceiverComponent {
         private boolean propertySetOnExchange;
         private boolean propertySetOnMessage;
 		

Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/SubscriptionTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -37,43 +37,71 @@
 
 public class SubscriptionTest extends TestCase {
 
-	public void testStNull() throws Exception {
-		runTest("st", null);
-	}
-	
-	public void testStSt() throws Exception {
-		runTest("st", "st");
-	}
-	
-	public void testStSeda() throws Exception {
-		runTest("st", "seda");
-	}
-	
-	public void testSedaNull() throws Exception {
-		runTest("seda", null);
-	}
-	
-	public void testSedaSt() throws Exception {
-		runTest("seda", "st");
-	}
-	
-	public void testSedaSeda() throws Exception {
-		runTest("seda", "seda");
-	}
-	
-	private void runTest(String flowName, String subscriptionFlowName) throws Exception {
+    public void testStNullAsync() throws Exception {
+        runTest("st", null, false);
+    }
+    
+    public void testStStAsync() throws Exception {
+        runTest("st", "st", false);
+    }
+    
+    public void testStSedaAsync() throws Exception {
+        runTest("st", "seda", false);
+    }
+    
+    public void testSedaNullAsync() throws Exception {
+        runTest("seda", null, false);
+    }
+    
+    public void testSedaStAsync() throws Exception {
+        runTest("seda", "st", false);
+    }
+    
+    public void testSedaSedaAsync() throws Exception {
+        runTest("seda", "seda", false);
+    }
+    
+    public void testStNullSync() throws Exception {
+        runTest("st", null, true);
+    }
+    
+    public void testStStSync() throws Exception {
+        runTest("st", "st", true);
+    }
+    
+    public void testStSedaSync() throws Exception {
+        runTest("st", "seda", true);
+    }
+    
+    public void testSedaNullSync() throws Exception {
+        runTest("seda", null, true);
+    }
+    
+    public void testSedaStSync() throws Exception {
+        runTest("seda", "st", true);
+    }
+    
+    public void testSedaSedaSync() throws Exception {
+        runTest("seda", "seda", true);
+    }
+    
+	private void runTest(String flowName, String subscriptionFlowName, boolean sync) throws Exception {
 		JBIContainer container = new JBIContainer();
 		try {
 			container.getBroker().setFlow(FlowProvider.getFlow(flowName));
 			if (subscriptionFlowName != null) {
 				container.getBroker().getSubscriptionManager().setFlow(FlowProvider.getFlow(subscriptionFlowName));
 			}
-			container.setCreateMBeanServer(true);
+            // TODO: check why the following line is enabled, there is 
+            // a 5 seconds pause when Management stuff is initialized
+			//container.setCreateMBeanServer(true);
 			container.init();
 			container.start();
 			
 			SenderListener sender = new SenderListener();
-			container.activateComponent(new ActivationSpec("sender", sender));
+            ActivationSpec senderActivationSpec = new ActivationSpec("sender", sender);
+            senderActivationSpec.setFailIfNoDestinationEndpoint(false);
+			container.activateComponent(senderActivationSpec);
 			
 			Receiver receiver1 = new ReceiverComponent();
 			container.activateComponent(createReceiverAS("receiver1", receiver1));
@@ -81,13 +109,13 @@
 			Receiver receiver2 = new ReceiverComponent();
 			container.activateComponent(createReceiverAS("receiver2", receiver2));
 			
-			sender.sendMessages(1);
+			sender.sendMessages(1, sync);
 			
-			Thread.sleep(1000);
+			Thread.sleep(100);
 			
 			assertEquals(1, receiver1.getMessageList().getMessageCount());
 			assertEquals(1, receiver2.getMessageList().getMessageCount());
-			assertEquals(2, sender.responses.size());
+			assertEquals(0, sender.responses.size());
 		} finally {
 			container.shutDown();
 		}

Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (702 => 703)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java	2005-10-29 01:04:48 UTC (rev 702)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java	2005-10-29 01:22:52 UTC (rev 703)
@@ -26,6 +26,7 @@
 import org.jencks.factory.TransactionManagerFactoryBean;
 import org.servicemix.examples.ReceiverComponent;
 import org.servicemix.examples.SenderComponent;
+import org.servicemix.jbi.RuntimeJBIException;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
@@ -35,6 +36,7 @@
 import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
+import javax.jbi.JBIException;
 import javax.transaction.TransactionManager;
 import javax.transaction.UserTransaction;
 
@@ -123,7 +125,11 @@
     	senderContainer.setAutoEnlistInTransaction(true);
         tt.execute(new TransactionCallback() {
 	  		public Object doInTransaction(TransactionStatus status) {
-	  			sender.sendMessages(NUM_MESSAGES);
+                try {
+                    sender.sendMessages(NUM_MESSAGES);
+                } catch (JBIException e) {
+                    throw new RuntimeJBIException(e);
+                }
 	  			return null;
 	  		}
         });

Reply via email to