Author: gtully
Date: Thu Feb 19 18:15:59 2009
New Revision: 745953

URL: http://svn.apache.org/viewvc?rev=745953&view=rev
Log:
tidy up redispatch logic a little more, resolve: AMQ-2128, deliver acks on 
dispose in auto_ack mode. also get some closure on: MQ-2075

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Thu Feb 19 18:15:59 2009
@@ -630,7 +630,7 @@
     void deliverAcks() {
         MessageAck ack = null;
         if (deliveryingAcknowledgements.compareAndSet(false, true)) {
-            if (this.optimizeAcknowledge) {
+            if (session.isAutoAcknowledge()) {
                synchronized(deliveredMessages) {
                        ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                        if (ack != null) {
@@ -775,14 +775,12 @@
             if (session.getTransacted()) {
                 // Do nothing.
             } else if (session.isAutoAcknowledge()) {
-                synchronized (deliveredMessages) {
-                    if (!deliveredMessages.isEmpty()) {
-                        if (optimizeAcknowledge) {
-                            if (deliveryingAcknowledgements.compareAndSet(
-                                    false, true)) {
+                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
+                    synchronized (deliveredMessages) {
+                        if (!deliveredMessages.isEmpty()) {
+                            if (optimizeAcknowledge) {
                                 ackCounter++;
-                                if (ackCounter >= (info
-                                        .getCurrentPrefetchSize() * .65)) {
+                                if (ackCounter >= 
(info.getCurrentPrefetchSize() * .65)) {
                                        MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                        if (ack != null) {
                                            deliveredMessages.clear();
@@ -790,16 +788,16 @@
                                            session.sendAck(ack);
                                        }
                                 }
-                                deliveryingAcknowledgements.set(false);
-                            }
-                        } else {
-                            MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
-                            if (ack!=null) {
-                               deliveredMessages.clear();
-                               session.sendAck(ack);
+                            } else {
+                                MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                if (ack!=null) {
+                                    deliveredMessages.clear();
+                                    session.sendAck(ack);
+                                }
                             }
                         }
                     }
+                    deliveryingAcknowledgements.set(false);
                 }
             } else if (session.isDupsOkAcknowledge()) {
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Thu Feb 19 18:15:59 2009
@@ -336,8 +336,7 @@
                     }
                 }
                 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-                MessageGroupSet ownedGroups = getMessageGroupOwners()
-                        .removeConsumer(consumerId);
+                getMessageGroupOwners().removeConsumer(consumerId);
                 
                 // redeliver inflight messages
                 List<QueueMessageReference> list = new 
ArrayList<QueueMessageReference>();
@@ -353,19 +352,10 @@
                     list.add(qmr);
                 }
                 
-                if (!list.isEmpty() && !consumers.isEmpty()) {
+                if (!list.isEmpty()) {
                     doDispatch(list);
                 }
             }
-            //if it is a last consumer (and not a browser) dispatch all 
pagedIn messages
-            if (consumers.isEmpty() && !(sub instanceof 
QueueBrowserSubscription)) {
-                       List<QueueMessageReference> list = new 
ArrayList<QueueMessageReference>();
-                       for (QueueMessageReference ref : 
pagedInMessages.values()) {
-                               list.add(ref);
-                       }
-                       pagedInPendingDispatch.clear();
-                       doDispatch(list);
-            }
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
@@ -1068,7 +1058,7 @@
                }
                
                synchronized (messages) {
-                   pageInMoreMessages = !messages.isEmpty();
+                   pageInMoreMessages |= !messages.isEmpty();
                }               
                
                // Kinda ugly.. but I think dispatchLock is the only mutex 
protecting the 
@@ -1333,14 +1323,18 @@
      *         were not full.
      */
     private List<QueueMessageReference> 
doActualDispatch(List<QueueMessageReference> list) throws Exception {
-        List<QueueMessageReference> rc = new 
ArrayList<QueueMessageReference>(list.size());
-        Set<Subscription> fullConsumers = new 
HashSet<Subscription>(this.consumers.size());
         List<Subscription> consumers;
         
         synchronized (this.consumers) {
+            if (this.consumers.isEmpty()) {
+                return list;
+            }
             consumers = new ArrayList<Subscription>(this.consumers);
         }
 
+        List<QueueMessageReference> rc = new 
ArrayList<QueueMessageReference>(list.size());
+        Set<Subscription> fullConsumers = new 
HashSet<Subscription>(this.consumers.size());
+        
         for (MessageReference node : list) {
             Subscription target = null;
             int interestCount=0;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=745953&r1=745952&r2=745953&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
 Thu Feb 19 18:15:59 2009
@@ -358,15 +358,99 @@
         assertEquals(4, counter.get());
     }
 
-    public void 
initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
+    public void 
initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() 
{ 
         addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("ackMode", new Object[] 
{Integer.valueOf(Session.AUTO_ACKNOWLEDGE), 
Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
-                                                      
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("ackMode", new Object[] 
{Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
         addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
     }
 
-    public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws 
Exception {
+    public void 
testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws 
Exception {
+    
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used 
as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        connection.setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
 
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    // order is not guaranteed as the connection is started 
before the listener is set.
+                    // assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // assert msg 2 was redelivered as close() from onMessages() will only 
ack in auto_ack mode
+        assertEquals(5, counter.get());      
+    }
+
+    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] 
{Integer.valueOf(Session.AUTO_ACKNOWLEDGE), 
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws 
Exception {
+    
         final AtomicInteger counter = new AtomicInteger(0);
         final CountDownLatch sendDone = new CountDownLatch(1);
         final CountDownLatch got2Done = new CountDownLatch(1);
@@ -426,13 +510,12 @@
                 try {
                     TextMessage tm = (TextMessage)m;
                     LOG.info("Got in second listener: " + tm.getText());
-                    assertEquals("" + counter.get(), tm.getText());
                     counter.incrementAndGet();
                     if (counter.get() == 4) {
                         done2.countDown();
                     }
                 } catch (Throwable e) {
-                    LOG.info("unexpected ex onMessage: ", e);
+                    LOG.error("unexpected ex onMessage: ", e);
                 }
             }
         });
@@ -440,9 +523,9 @@
         assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
         Thread.sleep(200);
 
+        // close from onMessage with Auto_ack will ack
         // Make sure only 4 messages were delivered.
         assertEquals(4, counter.get());
-
     }
 
     public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {


Reply via email to