Author: rgodfrey
Date: Thu Jan  3 05:23:04 2008
New Revision: 608477

URL: http://svn.apache.org/viewvc?rev=608477&view=rev
Log:
QPID-499 : Added per-virtual host timed tasks to inspect queues (with no 
consumers) for expired messages

Modified:
    incubator/qpid/branches/M2/java/broker/etc/config.xml
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Modified: incubator/qpid/branches/M2/java/broker/etc/config.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/etc/config.xml?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2/java/broker/etc/config.xml Thu Jan  3 05:23:04 
2008
@@ -98,6 +98,10 @@
                     
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
                 </store>
 
+                <housekeeping>
+                    
<expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+                </housekeeping>
+
                 <security>
                     <!-- Need protocol changes to allow this-->
                     <authentication>

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Thu Jan  3 05:23:04 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -37,6 +38,8 @@
 import javax.management.JMException;
 import java.text.MessageFormat;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -153,10 +156,7 @@
     /** total messages received by the queue since startup. */
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    public int compareTo(Object o)
-    {
-        return _name.compareTo(((AMQQueue) o).getName());
-    }
+
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString 
owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -936,5 +936,22 @@
     public void subscriberHasPendingResend(boolean hasContent, 
SubscriptionImpl subscription, AMQMessage msg)
     {
         _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+    }
+
+    public int compareTo(Object o)
+    {
+        return _name.compareTo(((AMQQueue) o).getName());
+    }
+
+
+    public void removeExpiredIfNoSubscribers() throws AMQException
+    {
+        synchronized(_subscribers.getChangeLock())
+        {
+            if(_subscribers.isEmpty())
+            {
+                _deliveryMgr.removeExpired();
+            }
+        }
     }
 }

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Thu Jan  3 05:23:04 2008
@@ -211,6 +211,29 @@
         }
     }
 
+    /**
+     *  NOTE : This method should only be called when there are no active 
subscribers
+     */
+    public void removeExpired() throws AMQException
+    {
+        _lock.lock();
+
+
+        for(Iterator<AMQMessage> iter = _messages.iterator(); iter.hasNext();)
+        {
+            AMQMessage msg = iter.next();
+            if(msg.expired(_queue))
+            {
+                _queue.dequeue(_reapingStoreContext,msg);
+                msg.decrementReference(_reapingStoreContext);
+                iter.remove();
+            }
+        }
+
+
+        _lock.unlock();
+    }
+
     /** @return the state of the async processor. */
     public boolean isProcessingAsync()
     {

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 Thu Jan  3 05:23:04 2008
@@ -97,4 +97,6 @@
     long getOldestMessageArrival();
 
     void subscriberHasPendingResend(boolean hasContent, Subscription 
subscription, AMQMessage msg);
+
+    void removeExpired() throws AMQException;
 }

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 Thu Jan  3 05:23:04 2008
@@ -37,7 +37,8 @@
 
     /** Used to control the round robin delivery of content */
     private int _currentSubscriber;
-    private final Object _subscriptionsChange = new Object();
+
+    private final Object _changeLock = new Object();
 
 
     /** Accessor for unit tests. */
@@ -48,7 +49,7 @@
 
     public void addSubscriber(Subscription subscription)
     {
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             _subscriptions.add(subscription);
         }
@@ -66,7 +67,7 @@
         // TODO: possibly need O(1) operation here.
 
         Subscription sub = null;
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             int subIndex = _subscriptions.indexOf(subscription);
 
@@ -226,4 +227,11 @@
     {
         return _subscriptions.size();
     }
+
+
+    public Object getChangeLock()
+    {
+        return _changeLock;
+    }
+    
 }

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 Thu Jan  3 05:23:04 2008
@@ -39,8 +39,13 @@
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
 
 public class VirtualHost implements Accessable
 {
@@ -66,6 +71,11 @@
     private AccessManager _accessManager;
 
 
+    private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", 
true);
+
+    private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+
+
     public void setAccessableName(String name)
     {
         _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -162,6 +172,44 @@
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
+
+        initialiseHouseKeeping(hostConfig);
+
+    }
+
+    private void initialiseHouseKeeping(final Configuration hostConfig)
+    {
+
+        long period = 
hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", 
DEFAULT_HOUSEKEEPING_PERIOD);
+
+        /* add a timer task to iterate over queues, cleaning expired messages 
from queues with no consumers */
+        if(period != 0L)
+        {
+            class RemoveExpiredMessagesTask extends TimerTask
+            {
+                public void run()
+                {
+                    for(AMQQueue q : _queueRegistry.getQueues())
+                    {
+
+                        try
+                        {
+                            q.removeExpiredIfNoSubscribers();
+                        }
+                        catch (AMQException e)
+                        {
+                            _logger.error("Exception in housekeeping for 
queue: " + q.getName().toString(),e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+
+            }
+
+            _houseKeepingTimer.scheduleAtFixedRate(new 
RemoveExpiredMessagesTask(),
+                                                   period/2,
+                                                   period);
+        }
     }
 
     private void initialiseMessageStore(Configuration config) throws Exception

Modified: 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Jan  3 05:23:04 2008
@@ -39,35 +39,9 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -2106,6 +2080,72 @@
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), 
protocolHandler, nowait);
     }
+
+
+    /**
+     * Returns the number of messages currently queued for the given 
destination.
+     *
+     * <p/>Note that this operation automatically retries in the event of 
fail-over.
+     *
+     * @param amqd            The destination to be checked
+     *
+     * @return the number of queued messages.
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     */
+    public long getQueueDepth(final AMQDestination amqd)
+            throws AMQException
+    {
+
+        class QueueDeclareOkHandler extends SpecificMethodFrameListener
+        {
+
+            private long _messageCount;
+            private long _consumerCount;
+
+            public QueueDeclareOkHandler()
+            {
+                super(getChannelId(), QueueDeclareOkBody.class);
+            }
+
+            public boolean processMethod(int channelId, AMQMethodBody frame) 
//throws AMQException
+            {
+                boolean matches = super.processMethod(channelId, frame);
+                QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+                _messageCount = declareOk.getMessageCount();
+                _consumerCount = declareOk.getConsumerCount();
+                return matches;
+            }
+
+        }
+
+        return new FailoverNoopSupport<Long, AMQException>(
+                new FailoverProtectedOperation<Long, AMQException>()
+                {
+                    public Long execute() throws AMQException, 
FailoverException
+                    {
+
+                        AMQFrame queueDeclare =
+                                QueueDeclareBody.createAMQFrame(_channelId, 
getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                                                null, // 
arguments
+                                                                
amqd.isAutoDelete(), // autoDelete
+                                                                
amqd.isDurable(), // durable
+                                                                
amqd.isExclusive(), // exclusive
+                                                                false, // 
nowait
+                                                                true, // 
passive
+                                                                
amqd.getAMQQueueName(), // queue
+                                                                getTicket()); 
// ticket
+                        QueueDeclareOkHandler okHandler = new 
QueueDeclareOkHandler();
+                        //getProtocolHandler().syncWrite(queueDeclare, 
QueueDeclareOkBody.class);
+                        
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+                        return okHandler._messageCount;
+                    }
+                }, _connection).execute();
+
+    }
+
+
 
     /**
      * Declares the named exchange and type of exchange.

Modified: 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
 (original)
+++ 
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
 Thu Jan  3 05:23:04 2008
@@ -215,6 +215,14 @@
                 public void remove()
                 {
                     last.remove();
+                    if(last == _mainIterator)
+                    {
+                        _size.decrementAndGet();
+                    }
+                    else
+                    {
+                        _messageHeadSize.decrementAndGet();                    
    
+                    }
                 }
             };
     }

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=608477&r1=608476&r2=608477&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
 Thu Jan  3 05:23:04 2008
@@ -25,7 +25,11 @@
 import junit.framework.Assert;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import javax.jms.JMSException;
@@ -38,6 +42,7 @@
 import javax.jms.Message;
 import javax.naming.spi.InitialContextFactory;
 import javax.naming.Context;
+import javax.naming.NamingException;
 import java.util.Hashtable;
 
 
@@ -53,21 +58,37 @@
 
     private final long TIME_TO_LIVE = 1000L;
 
-    Context _context;
-
-    private Connection _clientConnection, _producerConnection;
-
-    private MessageConsumer _consumer;
-    MessageProducer _producer;
-    Session _clientSession, _producerSession;
     private static final int MSG_COUNT = 50;
+    private static final long SERVER_TTL_TIMEOUT = 60000L;
 
     protected void setUp() throws Exception
     {
-        if (BROKER.startsWith("vm://"))
+        super.setUp();
+
+        if (usingInVMBroker())
         {
             TransportConnection.createVMBroker(1);
         }
+
+
+    }
+
+    private boolean usingInVMBroker()
+    {
+        return BROKER.startsWith("vm://");
+    }
+
+    protected void tearDown() throws Exception
+    {
+        if (usingInVMBroker())
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+        super.tearDown();
+    }
+
+    public void testPassiveTTL() throws JMSException, NamingException
+    {
         InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
 
         Hashtable<String, String> env = new Hashtable<String, String>();
@@ -75,56 +96,40 @@
         env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
         env.put("queue.queue", QUEUE);
 
-        _context = factory.getInitialContext(env);
+        Context context = factory.getInitialContext(env);
 
-        Queue queue = (Queue) _context.lookup("queue");
+        Queue queue = (Queue) context.lookup("queue");
 
         //Create Client 1
-        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+        Connection clientConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
 
-        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Session clientSession = clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-        _consumer = _clientSession.createConsumer(queue);
+        MessageConsumer consumer = clientSession.createConsumer(queue);
 
         //Create Producer
-        _producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
-
-        _producerConnection.start();
+        Connection producerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
 
-        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        _producer = _producerSession.createProducer(queue);
-    }
+        producerConnection.start();
 
-    protected void tearDown() throws Exception
-    {
-        _clientConnection.close();
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-        _producerConnection.close();
-        super.tearDown();
-        
-        if (BROKER.startsWith("vm://"))
-        {
-            TransportConnection.killAllVMBrokers();
-        }
-    }
+        MessageProducer producer = producerSession.createProducer(queue);
 
-    public void test() throws JMSException
-    {
         //Set TTL
         int msg = 0;
-        _producer.send(nextMessage(String.valueOf(msg), true));
+        producer.send(nextMessage(String.valueOf(msg), true, producerSession, 
producer));
 
-        _producer.setTimeToLive(TIME_TO_LIVE);
+        producer.setTimeToLive(TIME_TO_LIVE);
 
         for (; msg < MSG_COUNT - 2; msg++)
         {
-            _producer.send(nextMessage(String.valueOf(msg), false));
+            producer.send(nextMessage(String.valueOf(msg), false, 
producerSession, producer));
         }
 
         //Reset TTL
-        _producer.setTimeToLive(0L);
-        _producer.send(nextMessage(String.valueOf(msg), false));
+        producer.setTimeToLive(0L);
+        producer.send(nextMessage(String.valueOf(msg), false, producerSession, 
producer));
 
          try
         {
@@ -136,31 +141,71 @@
 
         }
 
-        _clientConnection.start();
+        clientConnection.start();
 
         //Receive Message 0
-        Message received = _consumer.receive(100);
+        Message received = consumer.receive(100);
         Assert.assertNotNull("First message not received", received);
         Assert.assertTrue("First message doesn't have first set.", 
received.getBooleanProperty("first"));
         Assert.assertEquals("First message has incorrect TTL.", 0L, 
received.getLongProperty("TTL"));
 
 
-        received = _consumer.receive(100);
+        received = consumer.receive(100);
         Assert.assertNotNull("Final message not received", received);
         Assert.assertFalse("Final message has first set.", 
received.getBooleanProperty("first"));
         Assert.assertEquals("Final message has incorrect TTL.", 0L, 
received.getLongProperty("TTL"));
 
-        received = _consumer.receive(100);
+        received = consumer.receive(100);
         Assert.assertNull("More messages received", received);
+
+        clientConnection.close();
+
+        producerConnection.close();
     }
 
-    private Message nextMessage(String msg, boolean first) throws JMSException
+    private Message nextMessage(String msg, boolean first, Session 
producerSession, MessageProducer producer) throws JMSException
     {
-        Message send = _producerSession.createTextMessage("Message " + msg);
+        Message send = producerSession.createTextMessage("Message " + msg);
         send.setBooleanProperty("first", first);
-        send.setLongProperty("TTL", _producer.getTimeToLive());
+        send.setLongProperty("TTL", producer.getTimeToLive());
         return send;
     }
 
+
+    /**
+     * Tests the expired messages get actively deleted even on queues which 
have no consumers
+     */
+    public void testActiveTTL() throws URLSyntaxException, AMQException, 
JMSException, InterruptedException
+    {
+        Connection producerConnection = new 
AMQConnection(BROKER,"guest","guest","activeTTLtest","test");
+        AMQSession producerSession = (AMQSession) 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = producerSession.createTemporaryQueue();
+        producerSession.declareAndBind((AMQDestination) queue);
+        MessageProducer producer = producerSession.createProducer(queue);
+        producer.setTimeToLive(1000L);
+
+        // send Messages
+        for(int i = 0; i < MSG_COUNT; i++)
+        {
+            producer.send(producerSession.createTextMessage("Message: "+i));
+        }
+        long failureTime = System.currentTimeMillis() + 2*SERVER_TTL_TIMEOUT;
+
+        // check Queue depth for up to TIMEOUT seconds
+        long messageCount;
+
+        do
+        {
+            Thread.sleep(100);
+            messageCount = producerSession.getQueueDepth((AMQDestination) 
queue);
+        }
+        while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+        assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+    }
 
 }


Reply via email to