Author: gtully
Date: Fri Mar 13 11:59:08 2009
New Revision: 753214

URL: http://svn.apache.org/viewvc?rev=753214&view=rev
Log:
resolve AMQ-2102|https://issues.apache.org/activemq/browse/AMQ-2102 - refactor 
message dispatch on slave to take account of subscription choice on the master, 
this ensures slave is in sync w.r.t outstanding acks. 
processDispatchNotification imoplemented by Queue type destinations which 
delegates to subscription after doing a dispatch, test demonstrates slve out of 
sync errors

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
 Fri Mar 13 11:59:08 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.Connection;
@@ -28,6 +30,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.ExceptionResponse;
@@ -58,8 +61,9 @@
     private static final Log LOG = LogFactory.getLog(MasterBroker.class);
     private Transport slave;
     private AtomicBoolean started = new AtomicBoolean(false);
-    private final Object addConsumerLock = new Object();
 
+    private Map<ConsumerId, ConsumerId> consumers = new 
ConcurrentHashMap<ConsumerId, ConsumerId>();
+    
     /**
      * Constructor
      * 
@@ -197,14 +201,19 @@
      * @throws Exception
      */
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo 
info) throws Exception {
-        // as master and slave do independent dispatch, the consumer add order 
between master and slave
-        // needs to be maintained
-        synchronized (addConsumerLock) {
-           sendSyncToSlave(info);
-           return super.addConsumer(context, info);
-        }
+        sendSyncToSlave(info);
+        consumers.put(info.getConsumerId(), info.getConsumerId());
+        return super.addConsumer(context, info);
     }
 
+    @Override
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info)
+            throws Exception {
+        super.removeConsumer(context, info);
+        consumers.remove(info.getConsumerId());
+        sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
+   }
+
     /**
      * remove a subscription
      * 
@@ -317,7 +326,9 @@
         if (messageDispatch.getMessage() != null) {
             Message msg = messageDispatch.getMessage();
             mdn.setMessageId(msg.getMessageId());
-            sendSyncToSlave(mdn);
+            if (consumers.containsKey(messageDispatch.getConsumerId())) {
+                sendSyncToSlave(mdn);
+            }
         }
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Fri Mar 13 11:59:08 2009
@@ -418,6 +418,34 @@
         Subscription sub = 
subscriptions.get(messageDispatchNotification.getConsumerId());
         if (sub != null) {
             
sub.processMessageDispatchNotification(messageDispatchNotification);
+        } else {
+            throw new JMSException("Slave broker out of sync with master - 
Subscription: "
+                    + messageDispatchNotification.getConsumerId()
+                    + " on " + messageDispatchNotification.getDestination()
+                    + " does not exist for dispatch of message: "
+                    + messageDispatchNotification.getMessageId());
+        }
+    }
+    
+    /*
+     * For a Queue/TempQueue, dispatch order is imperative to match acks, so 
the dispatch is deferred till 
+     * the notification to ensure that the subscription chosen by the master 
is used. AMQ-2102
+     */ 
+    protected void 
processDispatchNotificationViaDestination(MessageDispatchNotification 
messageDispatchNotification) throws Exception {
+        Destination dest = null;
+        synchronized (destinationsMutex) {
+            dest = 
destinations.get(messageDispatchNotification.getDestination());
+        }
+        if (dest != null) {
+            dest.processDispatchNotification(messageDispatchNotification);
+        } else {
+            throw new JMSException(
+                    "Slave broker out of sync with master - Destination: " 
+                            + messageDispatchNotification.getDestination()
+                            + " does not exist for consumer "
+                            + messageDispatchNotification.getConsumerId()
+                            + " with message: "
+                            + messageDispatchNotification.getMessageId());
         }
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 Fri Mar 13 11:59:08 2009
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 
+import javax.jms.JMSException;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
@@ -27,6 +29,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
@@ -485,4 +488,9 @@
             }
         }
     }
+    
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws 
Exception {
+    }
+
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 Fri Mar 13 11:59:08 2009
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
@@ -175,4 +176,12 @@
      void isFull(ConnectionContext context,Usage usage);
 
     List<Subscription> getConsumers();
+
+    /**
+     * called on Queues in slave mode to allow dispatch to follow subscription 
choice of master
+     * @param messageDispatchNotification
+     * @throws Exception
+     */
+    void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws 
Exception;
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 Fri Mar 13 11:59:08 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
@@ -259,4 +260,9 @@
     public void setMaxBrowsePageSize(int maxPageSize) {
         next.setMaxBrowsePageSize(maxPageSize);
     }
+
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws 
Exception {
+        next.processDispatchNotification(messageDispatchNotification);   
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Mar 13 11:59:08 2009
@@ -38,7 +38,6 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -48,15 +47,14 @@
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
-import org.apache.activemq.broker.region.group.MessageGroupSet;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerInfo;
@@ -65,7 +63,6 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.DeterministicTaskRunner;
@@ -1001,7 +998,7 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        boolean pageInMoreMessages = false;
+        boolean pageInMoreMessages = false;   
         synchronized(iteratingMutex) {
             BrowserDispatch rd;
                while ((rd = getNextBrowserDispatch()) != null) {
@@ -1244,13 +1241,13 @@
                 // Only page in the minimum number of messages which can be 
dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
toPageIn);
             }
-            if ((force || !consumers.isEmpty()) && toPageIn > 0) {
-                messages.setMaxBatchSize(toPageIn);
+            
+            if ((force || !consumers.isEmpty()) && toPageIn > 0) { 
                 int count = 0;
                 result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
                     try {
-                      
+                        messages.setMaxBatchSize(toPageIn);
                         messages.reset();
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
@@ -1326,7 +1323,8 @@
         List<Subscription> consumers;
         
         synchronized (this.consumers) {
-            if (this.consumers.isEmpty()) {
+            if (this.consumers.isEmpty() || isSlave()) {
+                // slave dispatch happens in processDispatchNotification
                 return list;
             }
             consumers = new ArrayList<Subscription>(this.consumers);
@@ -1422,4 +1420,104 @@
         return total;
     }
 
+    /* 
+     * In slave mode, dispatch is ignored till we get this notification as the 
dispatch
+     * process is non deterministic between master and slave.
+     * On a notification, the actual dispatch to the subscription (as chosen 
by the master) 
+     * is completed. 
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(
+            MessageDispatchNotification messageDispatchNotification) throws 
Exception {
+        // do dispatch
+        Subscription sub = 
getMatchingSubscription(messageDispatchNotification);
+        if (sub != null) {
+            MessageReference message = 
getMatchingMessage(messageDispatchNotification);
+            sub.add(message);   
+            
sub.processMessageDispatchNotification(messageDispatchNotification);
+        }
+    }
+
+    private QueueMessageReference 
getMatchingMessage(MessageDispatchNotification messageDispatchNotification) 
throws Exception {
+        QueueMessageReference message = null;
+        MessageId messageId = messageDispatchNotification.getMessageId();
+        
+        dispatchLock.lock();
+        try {
+            synchronized (pagedInPendingDispatch) {
+               for(QueueMessageReference ref : pagedInPendingDispatch) {
+                   if (messageId.equals(ref.getMessageId())) {
+                       message = ref;
+                       pagedInPendingDispatch.remove(ref);
+                       break;
+                   }
+               }
+            }
+    
+            if (message == null) {
+                synchronized (pagedInMessages) {
+                    message = pagedInMessages.get(messageId);
+                }
+            }
+            
+            if (message == null) {            
+                synchronized (messages) {
+                    try {
+                        messages.setMaxBatchSize(getMaxPageSize());
+                        messages.reset();
+                        while (messages.hasNext()) {
+                            MessageReference node = messages.next();
+                            node.incrementReferenceCount();
+                            messages.remove();
+                            if (messageId.equals(node.getMessageId())) {
+                                message = 
this.createMessageReference(node.getMessage());
+                                break;
+                            }
+                        }
+                    } finally {
+                        messages.release();
+                    }
+                }
+            }
+            
+            if (message == null) {
+                Message msg = loadMessage(messageId);
+                if (msg != null) {
+                    message = this.createMessageReference(msg);
+                }
+            }          
+            
+        } finally {
+            dispatchLock.unlock();        
+        }
+        if (message == null) {
+            throw new JMSException(
+                    "Slave broker out of sync with master - Message: "
+                    + messageDispatchNotification.getMessageId()
+                    + " on " + messageDispatchNotification.getDestination()
+                    + " does not exist among pending(" + 
pagedInPendingDispatch.size() + ") for subscription: "
+                    + messageDispatchNotification.getConsumerId());
+        }
+        return message;
+    }
+
+    /**
+     * Find a consumer that matches the id in the message dispatch notification
+     * @param messageDispatchNotification
+     * @return sub or null if the subscription has been removed before dispatch
+     * @throws JMSException
+     */
+    private Subscription getMatchingSubscription(MessageDispatchNotification 
messageDispatchNotification) throws JMSException {
+        Subscription sub = null;
+        synchronized (consumers) {
+            for (Subscription s : consumers) {
+                if 
(messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId()))
 {
+                    sub = s;
+                    break;
+                }
+            }
+        }
+        return sub;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
 Fri Mar 13 11:59:08 2009
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -64,4 +65,15 @@
         }
         return inactiveDestinations;
     }
+    
+    /*
+     * For a Queue, dispatch order is imperative to match acks, so the 
dispatch is deferred till 
+     * the notification to ensure that the subscription chosen by the master 
is used.
+     * 
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(MessageDispatchNotification 
messageDispatchNotification) throws Exception {
+        processDispatchNotificationViaDestination(messageDispatchNotification);
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
 Fri Mar 13 11:59:08 2009
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
@@ -72,4 +73,16 @@
 
         super.removeDestination(context, destination, timeout);
     }
+    
+    /*
+     * For a Queue, dispatch order is imperative to match acks, so the 
dispatch is deferred till 
+     * the notification to ensure that the subscription chosen by the master 
is used.
+     * 
+     * (non-Javadoc)
+     * @see 
org.apache.activemq.broker.region.AbstractRegion#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+     */
+    public void processDispatchNotification(MessageDispatchNotification 
messageDispatchNotification) throws Exception {
+        processDispatchNotificationViaDestination(messageDispatchNotification);
+    }
+
 }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
 Fri Mar 13 11:59:08 2009
@@ -108,11 +108,8 @@
         RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
                 RegionBroker.class);
 
-        // REVISIT the following two are not dependable at the moment, off by 
a small number
-        // for some reason? The work for a COUNT < ~500
-        //
-        //assertEquals("inflight match", 
rb.getDestinationStatistics().getInflight().getCount(), 
masterRb.getDestinationStatistics().getInflight().getCount());
-        //assertEquals("enqueues match", 
rb.getDestinationStatistics().getEnqueues().getCount(), 
masterRb.getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("inflight match", 
rb.getDestinationStatistics().getInflight().getCount(), 
masterRb.getDestinationStatistics().getInflight().getCount());
+        assertEquals("enqueues match", 
rb.getDestinationStatistics().getEnqueues().getCount(), 
masterRb.getDestinationStatistics().getEnqueues().getCount());
         
         assertEquals("dequeues match",
                 rb.getDestinationStatistics().getDequeues().getCount(),

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=753214&r1=753213&r2=753214&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
 Fri Mar 13 11:59:08 2009
@@ -47,14 +47,15 @@
 import org.apache.commons.logging.LogFactory;
 
 public class AMQ2102Test extends TestCase implements UncaughtExceptionHandler {
+       
+    final static int MESSAGE_COUNT = 12120;
+    final static int NUM_CONSUMERS = 10;
+    final static int CONSUME_ALL = -1;
     
     
-    final static int MESSAGE_COUNT = 5120;
-    final static int NUM_CONSUMERS = 20;
-    
     private static final Log LOG = LogFactory.getLog(AMQ2102Test.class);
     
-    private final Map<Thread, Throwable> exceptions = new 
ConcurrentHashMap<Thread, Throwable>();
+    private final static Map<Thread, Throwable> exceptions = new 
ConcurrentHashMap<Thread, Throwable>();
     
     private class Consumer implements Runnable, ExceptionListener {
         private ActiveMQConnectionFactory connectionFactory;
@@ -63,12 +64,14 @@
         private boolean running;
         private org.omg.CORBA.IntHolder startup;
         private Thread thread;
+        private int numToProcessPerIteration;
 
-        Consumer(ActiveMQConnectionFactory connectionFactory, String 
queueName, org.omg.CORBA.IntHolder startup, int id) {
+        Consumer(ActiveMQConnectionFactory connectionFactory, String 
queueName, org.omg.CORBA.IntHolder startup, int id, int numToProcess) {
             this.connectionFactory = connectionFactory;
             this.queueName = queueName;
             this.startup = startup;
             name = "Consumer-" + queueName + "-" + id;
+            numToProcessPerIteration = numToProcess;
             thread = new Thread(this, name);
         }
 
@@ -93,6 +96,7 @@
         }
 
         public void onException(JMSException e) {
+            exceptions.put(Thread.currentThread(), e);
             error("JMS exception: ", e);
         }
 
@@ -146,7 +150,13 @@
             Session session = null;
             try {
                 session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-                processMessages(session);
+                if (numToProcessPerIteration > 0) {
+                    while(isRunning()) {
+                        processMessages(session);
+                    }
+                } else {
+                    processMessages(session);
+                }
             } finally {
                 if (session != null) {
                     session.close();
@@ -189,7 +199,8 @@
                 }
                 startup = null;
             }
-            while (isRunning()) {
+            int numToProcess = numToProcessPerIteration;
+            do {
                 Message message = consumer.receive(5000);
 
                 if (message != null) {
@@ -201,7 +212,7 @@
                         session.rollback();
                     }
                 }
-            }
+            } while ((numToProcess == CONSUME_ALL || --numToProcess > 0) && 
isRunning());
         }
 
         public void run() {
@@ -224,7 +235,7 @@
         }
     }
     
-    private class Producer {
+    private class Producer implements ExceptionListener {
         private ActiveMQConnectionFactory connectionFactory;
         private String queueName;
         
@@ -246,6 +257,7 @@
 
             try {
                 connection = (ActiveMQConnection) 
connectionFactory.createConnection();
+                connection.setExceptionListener(this);
                 connection.start();
 
                 sendMessages(connection);
@@ -302,6 +314,7 @@
                 sendMessages(session, replyQueue, consumer);
             } finally {
                 consumer.close();
+                session.commit();
             }
         }
 
@@ -326,9 +339,8 @@
             }
         }
 
-        private void sendMessages(Session session, Destination replyQueue, 
MessageConsumer consumer) throws JMSException {
+        private void sendMessages(final Session session, Destination 
replyQueue, MessageConsumer consumer) throws JMSException {
             final org.omg.CORBA.IntHolder messageCount = new 
org.omg.CORBA.IntHolder(MESSAGE_COUNT);
-
             consumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message reply) {
                     if (reply instanceof TextMessage) {
@@ -340,6 +352,15 @@
                                 error("Problem processing reply", e);
                             }
                             messageCount.value--;
+                            if (messageCount.value % 200 == 0) {
+                                // ack a bunch of replys
+                                info("acking via session commit: 
messageCount=" + messageCount.value);
+                                try {
+                                    session.commit();
+                                } catch (JMSException e) {
+                                    error("Failed to commit with count: " + 
messageCount.value, e);
+                                }
+                            }
                             messageCount.notify();
                         }
                     } else {
@@ -354,11 +375,7 @@
             synchronized (messageCount) {
                 while (messageCount.value > 0) {
                     
-                    if (messageCount.value % 100 == 0) {
-                        // ack a bunch of replys
-                        debug("acking via session commit: messageCount=" + 
messageCount.value);
-                        session.commit();
-                    }
+                    
                     try {
                         messageCount.wait();
                     } catch (InterruptedException e) {
@@ -370,12 +387,21 @@
             session.commit();
             debug("All replies received...");
         }
+
+        public void onException(JMSException exception) {
+           LOG.error(exception);
+           exceptions.put(Thread.currentThread(), exception);
+        }
     }
 
     private static void debug(String message) {
         LOG.debug(message);
     }
 
+    private static void info(String message) {
+        LOG.info(message);
+    }
+    
     private static void error(String message) {
         LOG.error(message);
     }
@@ -384,15 +410,17 @@
         t.printStackTrace();
         String msg = message + ": " + (t.getMessage() != null ? t.getMessage() 
: t.toString());
         LOG.error(msg, t);
+        exceptions.put(Thread.currentThread(), t);
         fail(msg);
     }
 
-    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory 
connectionFactory, String queueName, int max) {
+    private ArrayList<Consumer> createConsumers(ActiveMQConnectionFactory 
connectionFactory, String queueName, 
+            int max, int numToProcessPerConsumer) {
         ArrayList<Consumer> consumers = new ArrayList<Consumer>(max);
         org.omg.CORBA.IntHolder startup = new org.omg.CORBA.IntHolder(max);
 
         for (int id = 0; id < max; id++) {
-            consumers.add(new Consumer(connectionFactory, queueName, startup, 
id));
+            consumers.add(new Consumer(connectionFactory, queueName, startup, 
id, numToProcessPerConsumer));
         }
         for (Consumer consumer : consumers) {
             consumer.start();
@@ -445,6 +473,7 @@
     public void tearDown() throws Exception {
         master.stop();
         slave.stop();
+        exceptions.clear();
     }
     
     public void testMasterSlaveBug() throws Exception {
@@ -453,7 +482,7 @@
         ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("failover:(" + 
                 masterUrl + ")?randomize=false");
         String queueName = "MasterSlaveBug";
-        ArrayList<Consumer> consumers = createConsumers(connectionFactory, 
queueName, NUM_CONSUMERS);
+        ArrayList<Consumer> consumers = createConsumers(connectionFactory, 
queueName, NUM_CONSUMERS, CONSUME_ALL);
         
         Producer producer = new Producer(connectionFactory, queueName);
         producer.execute(new String[]{});
@@ -468,10 +497,31 @@
         assertTrue(exceptions.isEmpty());
     }
 
+    
+    public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
+
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
+                "failover:(" + masterUrl + ")?randomize=false");
+        String queueName = "MasterSlaveBug";
+        ArrayList<Consumer> consumers = createConsumers(connectionFactory,
+                queueName, NUM_CONSUMERS, 10);
+
+        Producer producer = new Producer(connectionFactory, queueName);
+        producer.execute(new String[] {});
+
+        for (Consumer consumer : consumers) {
+            consumer.setRunning(false);
+        }
+
+        for (Consumer consumer : consumers) {
+            consumer.join();
+        }
+        assertTrue(exceptions.isEmpty());
+    }
+
     public void uncaughtException(Thread t, Throwable e) {
         error("" + t + e);
         exceptions.put(t,e);
-        
-        
     }
 }


Reply via email to