Title: [749] trunk/core/src/test/java/org/servicemix/examples: Jms / jca flows now inherit from AbstractFlow instead of SedaFlow.

Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java (748 => 749)

--- trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -134,6 +134,7 @@
             if (lcc.isPojo()){
                 lcc.getComponent().getLifeCycle().shutDown();
             }
+            lcc.getDeliveryChannel().close();
         }
         super.shutDown();
         container.getManagementContext().unregisterMBean(this);

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java (748 => 749)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/DeliveryChannelImpl.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -18,6 +18,7 @@
  **/
 package org.servicemix.jbi.messaging;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 import org.activemq.util.IdGenerator;
@@ -45,6 +46,10 @@
 import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 /**
  * DeliveryChannel implementation
  * 
@@ -53,6 +58,7 @@
 public class DeliveryChannelImpl implements DeliveryChannel {
     
     private static final Log log = LogFactory.getLog(DeliveryChannel.class);
+    
     private JBIContainer container;
     private ComponentContextImpl context;
     private LocalComponentConnector componentConnector;
@@ -67,6 +73,7 @@
     private long lastSendTime = System.currentTimeMillis();
     private long lastReceiveTime = System.currentTimeMillis();
     private AtomicBoolean closed = new AtomicBoolean(false);
+    private Map waiters = new ConcurrentHashMap();
 
     /**
      * Constructor
@@ -109,6 +116,23 @@
      */
     public void close() throws MessagingException {
         if (this.closed.compareAndSet(false, true)) {
+            List pending = queue.closeAndFlush();
+            for (Iterator iter = pending.iterator(); iter.hasNext();) {
+                MessageExchangeImpl messageExchange = (MessageExchangeImpl) iter.next();
+                if (messageExchange.getTransactionContext() != null && messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+                    synchronized (messageExchange.getMirror()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Notifying: " + messageExchange.getExchangeId());
+                        }
+                        messageExchange.getMirror().notify();
+                    }
+                }
+            }
+            // Interrupt all blocked thread
+            Object[] threads = waiters.keySet().toArray();
+            for (int i = 0; i < threads.length; i++) {
+                ((Thread) threads[i]).interrupt();
+            }
             // TODO: deactivate all endpoints from this component
             // TODO: Cause all accepts to return null
             // TODO: Abort all pending exchanges
@@ -227,10 +251,16 @@
         try {
             checkNotClosed();
         	MessageExchangeImpl me = (MessageExchangeImpl) queue.take();
+            if (log.isDebugEnabled()) {
+                log.debug("Accepting " + me.getExchangeId() + " in " + this);
+            }
         	resumeTx(me);
             me.handleAccept();
         	return me;
         }
+        catch (IllegalStateException e) {
+            throw new MessagingException("DeliveryChannel has been closed.");
+        }
         catch (InterruptedException e) {
             throw new MessagingException("accept failed", e);
         }
@@ -251,8 +281,14 @@
                 // If the exchange has already timed out,
                 // do not give it to the component
                 if (me.getPacket().isAborted()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Aborted " + me.getExchangeId() + " in " + this);
+                    }
                     me = null;
                 } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Accepting " + me.getExchangeId() + " in " + this);
+                    }
             		resumeTx(me);
                     me.handleAccept();
                 }
@@ -265,47 +301,67 @@
     }
 
     protected void doSend(MessageExchangeImpl messageExchange, boolean sync) throws MessagingException {
-        // If the message has timed out
-        if (messageExchange.getPacket().isAborted()) {
-            throw new ExchangeTimeoutException(messageExchange);
-        }
-        // Auto enlist exchange in transaction
-        autoEnlistInTx(messageExchange);
-        // Update persistence info
-        Boolean persistent = messageExchange.getPersistent();
-        if (persistent == null) {
-        	if (context.getActivationSpec().getPersistent() != null) {
-        		persistent = context.getActivationSpec().getPersistent();
-        	} else {
-        		persistent = Boolean.valueOf(context.getContainer().isPersistent());
-        	}
-        	messageExchange.setPersistent(persistent);
-        }
-        
-        if (exchangeThrottling) {
-            if (throttlingInterval > intervalCount) {
-                intervalCount = 0;
-                try {
-                    Thread.sleep(throttlingTimeout);
+        try {
+            // If the delivery channel has been closed
+            checkNotClosed();
+            // If the message has timed out
+            if (messageExchange.getPacket().isAborted()) {
+                throw new ExchangeTimeoutException(messageExchange);
+            }
+            // Auto enlist exchange in transaction
+            autoEnlistInTx(messageExchange);
+            // Update persistence info
+            Boolean persistent = messageExchange.getPersistent();
+            if (persistent == null) {
+            	if (context.getActivationSpec().getPersistent() != null) {
+            		persistent = context.getActivationSpec().getPersistent();
+            	} else {
+            		persistent = Boolean.valueOf(context.getContainer().isPersistent());
+            	}
+            	messageExchange.setPersistent(persistent);
+            }
+            
+            if (exchangeThrottling) {
+                if (throttlingInterval > intervalCount) {
+                    intervalCount = 0;
+                    try {
+                        Thread.sleep(throttlingTimeout);
+                    }
+                    catch (InterruptedException e) {
+                        log.warn("throttling failed", e);
+                    }
                 }
-                catch (InterruptedException e) {
-                    log.warn("throttling failed", e);
+                intervalCount++;
+            }
+            
+            long currentTime = System.currentTimeMillis();
+            messagingStats.getOutboundExchanges().increment();
+            messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
+            lastSendTime = currentTime;
+            if (messageExchange.getRole() == Role.CONSUMER) {
+                messageExchange.setSourceId(componentConnector.getComponentNameSpace());
+            }
+    
+            messageExchange.handleSend(sync);
+            container.sendExchange(messageExchange.getMirror());
+        } catch (MessagingException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception processing: " + messageExchange.getExchangeId() + " in " + this);
+            }
+            if (messageExchange.getTransactionContext() != null) {
+                suspendTx(messageExchange);
+                if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Notifying: " + messageExchange.getExchangeId() + " in " + this);
+                    }
+                    synchronized (messageExchange.getMirror()) {
+                        messageExchange.getMirror().notify();
+                    }
                 }
             }
-            intervalCount++;
+            throw e;
         }
         
-        long currentTime = System.currentTimeMillis();
-        messagingStats.getOutboundExchanges().increment();
-        messagingStats.getOutboundExchangeRate().addTime(currentTime - lastSendTime);
-        lastSendTime = currentTime;
-        if (messageExchange.getRole() == Role.CONSUMER) {
-            messageExchange.setSourceId(componentConnector.getComponentNameSpace());
-        }
-
-        messageExchange.handleSend(sync);
-        container.sendExchange(messageExchange.getMirror());
-        
         /*
         if (messageExchange.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
             synchronized (messageExchange.getMirror()) {
@@ -324,7 +380,6 @@
      * @throws MessagingException
      */
     public void send(MessageExchange messageExchange) throws MessagingException {
-        checkNotClosed();
         messageExchange.setProperty(JbiConstants.SEND_SYNC, null);
     	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
    		doSend(messageExchangeImpl, false);
@@ -350,7 +405,9 @@
      * @throws MessagingException
      */
     public boolean sendSync(MessageExchange messageExchange, long timeoutMS) throws MessagingException {
-        checkNotClosed();
+        if (log.isDebugEnabled()) {
+            log.debug("Sending " + messageExchange.getExchangeId() + " in " + this);
+        }
         // JBI 5.5.2.1.3: set the sendSync property
         messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
     	MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
@@ -528,8 +585,24 @@
                         suspendTx(me);
                         synchronized (me.getMirror()) {
                             me.getMirror().setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_SENT);
+                            if (log.isDebugEnabled()) {
+                                log.debug("Queuing: " + me.getExchangeId() + " in " + this);
+                            }
                             queue.put(me);
-                            me.getMirror().wait(Long.MAX_VALUE);
+                            if (log.isDebugEnabled()) {
+                                log.debug("Waiting: " + me.getExchangeId() + " in " + this);
+                            }
+                            // If the channel is closed while here,
+                            // we must abort
+                            waiters.put(Thread.currentThread(), Boolean.TRUE);
+                            try {
+                                me.getMirror().wait();
+                            } finally {
+                                waiters.remove(Thread.currentThread());
+                            }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Notified: " + me.getExchangeId() + " in " + this);
+                            }
                         }
                         resumeTx(me);
                     } else {

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java (748 => 749)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jca/JCAFlow.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -43,10 +43,11 @@
 import org.servicemix.jbi.framework.ComponentNameSpace;
 import org.servicemix.jbi.framework.ComponentPacket;
 import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.ComponentPacketEventListener;
 import org.servicemix.jbi.framework.LocalComponentConnector;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.nmr.Broker;
-import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.AbstractFlow;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 
@@ -78,10 +79,11 @@
  * 
  * @version $Revision$
  */
-public class JCAFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+public class JCAFlow extends AbstractFlow implements ConsumerAdvisoryEventListener, MessageListener, ComponentPacketEventListener {
+    
     private static final Log log = LogFactory.getLog(JCAFlow.class);
     private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
-    private String jmsURL = "peer://org.servicemix?persistent=false";
+    private String jmsURL = "tcp://localhost:61616";
     private String userName;
     private String password;
     private ConnectionFactory connectionFactory;
@@ -201,6 +203,7 @@
      */
     public void init(Broker broker, String subType) throws JBIException {
         super.init(broker, subType);
+        broker.getRegistry().addComponentPacketListener(this);
         try {
         	resourceAdapter = createResourceAdapter();
         	
@@ -327,23 +330,6 @@
     }
 
     /**
-     * Distribute an ExchangePacket
-     * 
-     * @param packet
-     * @throws JBIException
-     */
-    protected void doSend(MessageExchangeImpl me) throws JBIException {
-        if (me.getRole() == Role.PROVIDER 
-        		&& me.getTransactionContext() == null 
-        		&& !isPersistent(me)) {
-        	enqueuePacket(me);
-        }
-        else {
-            doRouting(me);
-        }
-    }
-    
-    /**
      * Ability for this flow to persist exchanges.
      * 
      * @return <code>true</code> if this flow can persist messages
@@ -358,7 +344,6 @@
      * @param event
      */
     public void onEvent(final ComponentPacketEvent event) {
-        super.onEvent(event);
         try {
             String componentName = event.getPacket().getComponentNameSpace().getName();
             if (event.getStatus() == ComponentPacketEvent.ACTIVATED){          
@@ -419,6 +404,16 @@
      * @param packet
      * @throws MessagingException
      */
+    protected void doSend(MessageExchangeImpl me) throws MessagingException {
+        doRouting(me);
+    }
+    
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param packet
+     * @throws MessagingException
+     */
     public void doRouting(final MessageExchangeImpl me) throws MessagingException {
         
         ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();

Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (748 => 749)

--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -34,10 +34,11 @@
 import org.servicemix.jbi.framework.ComponentNameSpace;
 import org.servicemix.jbi.framework.ComponentPacket;
 import org.servicemix.jbi.framework.ComponentPacketEvent;
+import org.servicemix.jbi.framework.ComponentPacketEventListener;
 import org.servicemix.jbi.framework.LocalComponentConnector;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.nmr.Broker;
-import org.servicemix.jbi.nmr.flow.seda.SedaFlow;
+import org.servicemix.jbi.nmr.flow.AbstractFlow;
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.MessagingException;
@@ -62,7 +63,8 @@
  * 
  * @version $Revision$
  */
-public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
+public class JMSFlow extends AbstractFlow implements ConsumerAdvisoryEventListener, MessageListener, ComponentPacketEventListener {
+    
     private static final Log log = LogFactory.getLog(JMSFlow.class);
     private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
     private String jmsURL = "peer://org.servicemix?persistent=false";
@@ -170,6 +172,7 @@
      */
     public void init(Broker broker, String subType) throws JBIException {
         super.init(broker, subType);
+        broker.getRegistry().addComponentPacketListener(this);
         try {
             if (connectionFactory == null) {
                 if (jmsURL != null) {
@@ -265,26 +268,11 @@
     }
 
     /**
-     * Distribute an ExchangePacket
-     * 
-     * @param packet
-     * @throws JBIException
-     */
-    protected void doSend(MessageExchangeImpl me) throws JBIException {
-        if (me.getTransactionContext() == null) {
-            enqueuePacket(me);
-        } else {
-            doRouting(me);
-        }
-    }
-
-    /**
      * Process state changes in Components
      * 
      * @param event
      */
     public void onEvent(ComponentPacketEvent event) {
-        super.onEvent(event);
         try {
             String componentName = event.getPacket().getComponentNameSpace().getName();
             if (event.getStatus() == ComponentPacketEvent.ACTIVATED){
@@ -336,6 +324,16 @@
      * @param packet
      * @throws MessagingException
      */
+    protected void doSend(MessageExchangeImpl me) throws MessagingException {
+        doRouting(me);
+    }
+    
+    /**
+     * Distribute an ExchangePacket
+     * 
+     * @param packet
+     * @throws MessagingException
+     */
     public void doRouting(MessageExchangeImpl me) throws MessagingException{
         ComponentNameSpace id=me.getRole()==Role.PROVIDER?me.getDestinationId():me.getSourceId();
         ComponentConnector cc=broker.getRegistry().getComponentConnector(id);

Modified: trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java (748 => 749)

--- trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/main/java/org/servicemix/jbi/util/BoundedLinkedQueue.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -17,6 +17,9 @@
  **/
 package org.servicemix.jbi.util;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class BoundedLinkedQueue {
 
     public static class LinkedNode {
@@ -86,6 +89,9 @@
 
     /** Number of takes since last reconcile * */
     protected int takeSidePutPermits_ = 0;
+    
+    /** Close flag */
+    protected volatile boolean closed; 
 
     /**
      * Create a queue with the given capacity
@@ -184,6 +190,8 @@
     }
 
     public Object peek() {
+        if (closed)
+            throw new IllegalStateException("Channel is closed");
         synchronized (head_) {
             LinkedNode first = head_.next;
             if (first != null)
@@ -196,6 +204,9 @@
     public Object take() throws InterruptedException {
         if (Thread.interrupted())
             throw new InterruptedException();
+        if (closed)
+            throw new IllegalStateException("Channel is closed");
+        
         Object x = extract();
         if (x != null)
             return x;
@@ -207,6 +218,8 @@
                         if (x != null) {
                             return x;
                         } else {
+                            if (closed)
+                                throw new IllegalStateException("Channel is closed");
                             takeGuard_.wait();
                         }
                     }
@@ -221,6 +234,9 @@
     public Object poll(long msecs) throws InterruptedException {
         if (Thread.interrupted())
             throw new InterruptedException();
+        if (closed)
+            throw new IllegalStateException("Channel is closed");
+
         Object x = extract();
         if (x != null)
             return x;
@@ -234,6 +250,8 @@
                         if (x != null || waitTime <= 0) {
                             return x;
                         } else {
+                            if (closed)
+                                throw new IllegalStateException("Channel is closed");
                             takeGuard_.wait(waitTime);
                             waitTime = msecs
                                     - (System.currentTimeMillis() - start);
@@ -275,6 +293,8 @@
             throw new IllegalArgumentException();
         if (Thread.interrupted())
             throw new InterruptedException();
+        if (closed)
+            throw new IllegalStateException("Channel is closed");
 
         synchronized (putGuard_) {
 
@@ -283,6 +303,8 @@
                     if (reconcilePutPermits() <= 0) {
                         try {
                             for (;;) {
+                                if (closed)
+                                    throw new IllegalStateException("Channel is closed");
                                 wait();
                                 if (reconcilePutPermits() > 0) {
                                     break;
@@ -306,6 +328,8 @@
             throw new IllegalArgumentException();
         if (Thread.interrupted())
             throw new InterruptedException();
+        if (closed)
+            throw new IllegalStateException("Channel is closed");
 
         synchronized (putGuard_) {
 
@@ -320,6 +344,8 @@
                                 long start = System.currentTimeMillis();
 
                                 for (;;) {
+                                    if (closed)
+                                        throw new IllegalStateException("Channel is closed");
                                     wait(waitTime);
                                     if (reconcilePutPermits() > 0) {
                                         break;
@@ -352,5 +378,30 @@
             return head_.next == null;
         }
     }
+    
+    public synchronized List closeAndFlush() {
+        // Set this queue as closed
+        closed = true;
+        // No more puts is allowed
+        synchronized (putGuard_) {
+            synchronized (this) {
+                takeSidePutPermits_ -= capacity_;
+                capacity_ = 0;
 
+                // Force immediate reconcilation.
+                reconcilePutPermits();
+                notifyAll();
+            }
+        }
+        synchronized (takeGuard_) {
+            takeGuard_.notifyAll();
+        }
+        ArrayList l = new ArrayList();
+        Object o;
+        while ((o = extract()) != null) {
+            l.add(o);
+        }
+        return l;
+    }
+
 }

Modified: trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/examples/AsyncReceiverPojo.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -86,18 +86,18 @@
     // Runnable interface
     //-------------------------------------------------------------------------
     public void run() {
-        try {
-            while (running) {
+        while (running) {
+            try {
                 DeliveryChannel deliveryChannel = context.getDeliveryChannel();
                 System.out.println("about to do an accept on deliveryChannel: " + deliveryChannel);
                 MessageExchange messageExchange = deliveryChannel.accept();
                 System.out.println("received me: " + messageExchange);
                 onMessageExchange(messageExchange);
             }
+            catch (MessagingException e) {
+                log.error("Failed to process inbound messages: " + e, e);
+            }
         }
-        catch (MessagingException e) {
-            log.error("Failed to process inbound messages: " + e, e);
-        }
     }
 
     public void onMessageExchange(MessageExchange exchange) throws MessagingException {

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractClusteredTransactionTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -101,9 +101,7 @@
     }
 
     public void testClusteredAsyncSendAsyncReceive() throws Exception {
-        // TODO: correct this test as it fails currently
-        throw new Exception("Correct this test case");
-    	//runClusteredTest(false, false);
+    	runClusteredTest(false, false);
     }
 
 }

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/AbstractPersistenceTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -54,8 +54,8 @@
     }
     
     protected void runSimpleTest(final boolean syncSend, final boolean syncReceive) throws Exception {
-    	final int numMessages = 10;
-    	//final int numMessages = NUM_MESSAGES;
+    	//final int numMessages = 1;
+    	final int numMessages = NUM_MESSAGES;
         final SenderComponent sender = new SenderComponent();
         sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
         final Receiver receiver;
@@ -65,13 +65,13 @@
         	    public void onMessageExchange(MessageExchange exchange) throws MessagingException {
         	    	try {
         	    		if (delivered.get(exchange.getExchangeId()) == null) {
+        	    		    System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
 	        	    		delivered.put(exchange.getExchangeId(), Boolean.TRUE);
         	    			tm.setRollbackOnly();
         	    			done(exchange);
-        	    			System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
 	        	    	} else {
+	        	    	    System.err.println("Message delivery accepted: " + exchange.getExchangeId());
 	        	    		super.onMessageExchange(exchange);
-        	    			System.err.println("Message delivery accepted: " + exchange.getExchangeId());
 	        	    	}
     	    		} catch (Exception e) {
     	    			throw new MessagingException(e);
@@ -83,14 +83,14 @@
         	    public void onMessageExchange(MessageExchange exchange) throws MessagingException {
         	    	try {
         	    		if (delivered.get(exchange.getExchangeId()) == null) {
+        	    		    System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
+        	    		    delivered.put(exchange.getExchangeId(), Boolean.TRUE);
         	    			tm.setRollbackOnly();
         	    	        exchange.setStatus(ExchangeStatus.DONE);
         	    	        getContext().getDeliveryChannel().send(exchange);
-        	    			System.err.println("Message delivery rolled back: " + exchange.getExchangeId());
 	        	    	} else {
-	        	    		delivered.put(exchange.getExchangeId(), Boolean.TRUE);
+	        	    	    System.err.println("Message delivery accepted: " + exchange.getExchangeId());
 	        	    		super.onMessageExchange(exchange);
-        	    			System.err.println("Message delivery accepted: " + exchange.getExchangeId());
 	        	    	}
     	    		} catch (Exception e) {
     	    			throw new MessagingException(e);

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowPersistentTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -72,9 +72,8 @@
     	}
     }
 
-    // TODO: this one should work
     public void testAsyncSendAsyncReceive() throws Exception {
-    	//runSimpleTest(false, false);
+    	runSimpleTest(false, false);
     }
 
 }

Modified: trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/messaging/JcaFlowTransactionTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -50,4 +50,22 @@
     	return flow;
     }
 
+    public void testSyncSendSyncReceive() throws Exception {
+        try {
+            runSimpleTest(true, true);
+            fail("sendSync can not be used");
+        } catch (IllegalStateException e) {
+            // sendSync can not be used
+        }
+    }
+
+    public void testSyncSendAsyncReceive() throws Exception {
+        try {
+            runSimpleTest(true, false);
+            fail("sendSync can not be used");
+        } catch (IllegalStateException e) {
+            // sendSync can not be used
+        }
+    }
+
 }

Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -183,6 +183,6 @@
         assertFalse(receiver2.getMessageList().hasReceivedMessage());
         receiver1.getMessageList().flushMessages();
         receiver2.getMessageList().flushMessages();
-        
     }
+    
 }

Modified: trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java (748 => 749)

--- trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java	2005-11-04 12:05:20 UTC (rev 748)
+++ trunk/core/src/test/java/org/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java	2005-11-04 15:24:01 UTC (rev 749)
@@ -82,4 +82,45 @@
       Thread.sleep(3000);
       receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
     }
+
+    public void testClusteredInOnly() throws Exception {
+        final SenderComponent sender = new SenderComponent();
+        final ReceiverComponent receiver1 =  new ReceiverComponent();
+        final ReceiverComponent receiver2 =  new ReceiverComponent();
+        sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
+
+        senderContainer.activateComponent(new ActivationSpec("sender", sender));
+        senderContainer.activateComponent(new ActivationSpec("receiver", receiver1));
+        receiverContainer.activateComponent(new ActivationSpec("receiver", receiver2));
+        Thread.sleep(1000);
+
+        sender.sendMessages(NUM_MESSAGES);
+        Thread.sleep(3000);
+        assertTrue(receiver1.getMessageList().hasReceivedMessage());
+        assertTrue(receiver2.getMessageList().hasReceivedMessage());
+        receiver1.getMessageList().flushMessages();
+        receiver2.getMessageList().flushMessages();
+        
+        senderContainer.deactivateComponent("receiver");
+        Thread.sleep(1000);
+        
+        sender.sendMessages(NUM_MESSAGES);
+        Thread.sleep(3000);
+        assertFalse(receiver1.getMessageList().hasReceivedMessage());
+        assertTrue(receiver2.getMessageList().hasReceivedMessage());
+        receiver1.getMessageList().flushMessages();
+        receiver2.getMessageList().flushMessages();
+        
+        senderContainer.activateComponent(new ActivationSpec("receiver", receiver1));
+        receiverContainer.deactivateComponent("receiver");
+        Thread.sleep(1000);
+        
+        sender.sendMessages(NUM_MESSAGES);
+        Thread.sleep(3000);
+        assertTrue(receiver1.getMessageList().hasReceivedMessage());
+        assertFalse(receiver2.getMessageList().hasReceivedMessage());
+        receiver1.getMessageList().flushMessages();
+        receiver2.getMessageList().flushMessages();
+    }
+    
 }

Reply via email to