Author: aidan
Date: Mon Jan 28 08:48:00 2008
New Revision: 615943

URL: http://svn.apache.org/viewvc?rev=615943&view=rev
Log:
Merged revisions 608477,609961,610475,610479,610806,611146 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r608477 | rgodfrey | 2008-01-03 13:23:04 +0000 (Thu, 03 Jan 2008) | 1 line
  
  QPID-499 : Added per-virtual host timed tasks to inspect queues (with no 
consumers) for expired messages
........
  r609961 | ritchiem | 2008-01-08 12:59:01 +0000 (Tue, 08 Jan 2008) | 2 lines
  
  QPID-499 : Patch to update the queue size statistics when the Active TTL 
process runs 
  Removed old single commented out code line from AMQSession.
........
  r610475 | ritchiem | 2008-01-09 17:32:43 +0000 (Wed, 09 Jan 2008) | 1 line
  
  Qpid-723 Added exec to qpid.start
........
  r610479 | ritchiem | 2008-01-09 17:39:54 +0000 (Wed, 09 Jan 2008) | 1 line
  
  Qpid-690 : Provide configurable delay between re-connecion attempts.
........
  r610806 | ritchiem | 2008-01-10 14:41:37 +0000 (Thu, 10 Jan 2008) | 1 line
  
  QPID-690 : Relaxed the timings on failover as Thread.sleep is accurate to 
10ms so may finish the sleep 10ms early. Resulting in erratic failures as 9.9s 
< 10s.
........
  r611146 | ritchiem | 2008-01-11 11:33:31 +0000 (Fri, 11 Jan 2008) | 1 line
  
  Patch by Aidan Skinner to make third constructor public. This is done so that 
the BDBMessageStore tests can still run with the addition of the VirtualHost 
reaper thread.
........

Modified:
    incubator/qpid/branches/M2.1/   (props changed)
    incubator/qpid/branches/M2.1/java/broker/etc/config.xml
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
    
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
    
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Propchange: incubator/qpid/branches/M2.1/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/branches/M2.1/java/broker/etc/config.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/etc/config.xml?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/etc/config.xml (original)
+++ incubator/qpid/branches/M2.1/java/broker/etc/config.xml Mon Jan 28 08:48:00 
2008
@@ -100,6 +100,10 @@
                     
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
                 </store>
 
+                <housekeeping>
+                    
<expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+                </housekeeping>
+
                 <security>
                     <!-- Need protocol changes to allow this-->
                     <authentication>

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Mon Jan 28 08:48:00 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -37,6 +38,8 @@
 import javax.management.JMException;
 import java.text.MessageFormat;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,10 +157,7 @@
     /** total messages received by the queue since startup. */
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    public int compareTo(Object o)
-    {
-        return _name.compareTo(((AMQQueue) o).getName());
-    }
+
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString 
owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -950,4 +950,20 @@
         return new QueueEntry(this, amqMessage);
     }
 
+    public int compareTo(Object o)
+    {
+        return _name.compareTo(((AMQQueue) o).getName());
+    }
+
+
+    public void removeExpiredIfNoSubscribers() throws AMQException
+    {
+        synchronized(_subscribers.getChangeLock())
+        {
+            if(_subscribers.isEmpty())
+            {
+                _deliveryMgr.removeExpired();
+            }
+        }
+    }
 }

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Mon Jan 28 08:48:00 2008
@@ -212,6 +212,30 @@
         }
     }
 
+    /**
+     *  NOTE : This method should only be called when there are no active 
subscribers
+     */
+    public void removeExpired() throws AMQException
+    {
+        _lock.lock();
+
+
+           for(Iterator<QueueEntry> iter = _messages.iterator(); 
iter.hasNext();)
+        {
+            QueueEntry entry = iter.next();
+            if(entry.expired())
+            {
+                // fixme: Currently we have to update the total byte size here 
for the data in the queue  
+                _totalMessageSize.addAndGet(-entry.getSize());
+                _queue.dequeue(_reapingStoreContext,entry);
+                iter.remove();
+            }
+           }
+
+
+        _lock.unlock();
+    }
+
     /** @return the state of the async processor. */
     public boolean isProcessingAsync()
     {

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 Mon Jan 28 08:48:00 2008
@@ -97,4 +97,6 @@
     long getOldestMessageArrival();
 
     void subscriberHasPendingResend(boolean hasContent, Subscription 
subscription, QueueEntry msg);
+
+    void removeExpired() throws AMQException;
 }

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 Mon Jan 28 08:48:00 2008
@@ -37,7 +37,8 @@
 
     /** Used to control the round robin delivery of content */
     private int _currentSubscriber;
-    private final Object _subscriptionsChange = new Object();
+
+    private final Object _changeLock = new Object();
 
 
     /** Accessor for unit tests. */
@@ -48,7 +49,7 @@
 
     public void addSubscriber(Subscription subscription)
     {
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             _subscriptions.add(subscription);
         }
@@ -66,7 +67,7 @@
         // TODO: possibly need O(1) operation here.
 
         Subscription sub = null;
-        synchronized (_subscriptionsChange)
+        synchronized (_changeLock)
         {
             int subIndex = _subscriptions.indexOf(subscription);
 
@@ -226,4 +227,11 @@
     {
         return _subscriptions.size();
     }
+
+
+    public Object getChangeLock()
+    {
+        return _changeLock;
+    }
+    
 }

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 Mon Jan 28 08:48:00 2008
@@ -1,260 +1,303 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import javax.management.NotCompliantMBeanException;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
-import org.apache.qpid.server.security.access.AccessManager;
-import org.apache.qpid.server.security.access.AccessManagerImpl;
-import org.apache.qpid.server.security.access.Accessable;
-import 
org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.configuration.Configurator;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-
-public class VirtualHost implements Accessable
-{
-    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
-
-    private final String _name;
-
-    private QueueRegistry _queueRegistry;
-
-    private ExchangeRegistry _exchangeRegistry;
-
-    private ExchangeFactory _exchangeFactory;
-
-    private MessageStore _messageStore;
-
-    protected VirtualHostMBean _virtualHostMBean;
-
-    private AMQBrokerManagerMBean _brokerMBean;
-
-    private AuthenticationManager _authenticationManager;
-
-    private AccessManager _accessManager;
-
-
-    public void setAccessableName(String name)
-    {
-        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
-                     + name + ") ignored remains :" + getAccessableName());
-    }
-
-    public String getAccessableName()
-    {
-        return _name;
-    }
-
-
-    /**
-     * Abstract MBean class. This has some of the methods implemented from 
management intrerface for exchanges. Any
-     * implementaion of an Exchange MBean should extend this class.
-     */
-    public class VirtualHostMBean extends AMQManagedObject implements 
ManagedVirtualHost
-    {
-        public VirtualHostMBean() throws NotCompliantMBeanException
-        {
-            super(ManagedVirtualHost.class, "VirtualHost");
-        }
-
-        public String getObjectInstanceName()
-        {
-            return _name.toString();
-        }
-
-        public String getName()
-        {
-            return _name.toString();
-        }
-
-        public VirtualHost getVirtualHost()
-        {
-            return VirtualHost.this;
-        }
-
-
-    } // End of MBean class
-
-    /**
-     * Used for testing only
-     * @param name
-     * @param store
-     * @throws Exception
-     */
-    public VirtualHost(String name, MessageStore store) throws Exception
-    {
-        this(name, null, store);
-    }
-
-    /**
-     * Normal Constructor
-     * @param name
-     * @param hostConfig
-     * @throws Exception
-     */
-    public VirtualHost(String name, Configuration hostConfig) throws Exception
-    {
-        this(name, hostConfig, null);
-    }
-
-    private VirtualHost(String name, Configuration hostConfig, MessageStore 
store) throws Exception
-    {
-        _name = name;
-
-        _virtualHostMBean = new VirtualHostMBean();
-        // This isn't needed to be registered
-        //_virtualHostMBean.register();
-
-        _queueRegistry = new DefaultQueueRegistry(this);
-        _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeFactory.initialise(hostConfig);
-        _exchangeRegistry = new DefaultExchangeRegistry(this);
-
-        if (store != null)
-        {
-            _messageStore = store;
-        }
-        else
-        {
-            if (hostConfig == null)
-            {
-                throw new IllegalAccessException("HostConfig and MessageStore 
cannot be null");
-            }
-            initialiseMessageStore(hostConfig);
-        }
-
-        _exchangeRegistry.initialise();
-
-        _authenticationManager = new 
PrincipalDatabaseAuthenticationManager(name, hostConfig);
-
-        _accessManager = new AccessManagerImpl(name, hostConfig);
-
-        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-        _brokerMBean.register();
-    }
-
-    private void initialiseMessageStore(Configuration config) throws Exception
-    {
-        String messageStoreClass = config.getString("store.class");
-
-        Class clazz = Class.forName(messageStoreClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof MessageStore))
-        {
-            throw new ClassCastException("Message store class must implement " 
+ MessageStore.class + ". Class " + clazz +
-                                         " does not.");
-        }
-        _messageStore = (MessageStore) o;
-        _messageStore.configure(this, "store", config);
-    }
-
-
-    public <T> T getConfiguredObject(Class<T> instanceType, Configuration 
config)
-    {
-        T instance;
-        try
-        {
-            instance = instanceType.newInstance();
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unable to instantiate configuration class " + 
instanceType + " - ensure it has a public default constructor");
-            throw new IllegalArgumentException("Unable to instantiate 
configuration class " + instanceType + " - ensure it has a public default 
constructor", e);
-        }
-        Configurator.configure(instance);
-
-        return instance;
-    }
-
-
-    public String getName()
-    {
-        return _name;
-    }
-
-    public QueueRegistry getQueueRegistry()
-    {
-        return _queueRegistry;
-    }
-
-    public ExchangeRegistry getExchangeRegistry()
-    {
-        return _exchangeRegistry;
-    }
-
-    public ExchangeFactory getExchangeFactory()
-    {
-        return _exchangeFactory;
-    }
-
-    public ApplicationRegistry getApplicationRegistry()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public MessageStore getMessageStore()
-    {
-        return _messageStore;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
-    public AccessManager getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public void close() throws Exception
-    {
-        if (_messageStore != null)
-        {
-            _messageStore.close();
-        }
-    }
-
-    public ManagedObject getBrokerMBean()
-    {
-        return _brokerMBean;
-    }
-
-    public ManagedObject getManagedObject()
-    {
-        return _virtualHostMBean;
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.security.access.AccessManager;
+import org.apache.qpid.server.security.access.AccessManagerImpl;
+import org.apache.qpid.server.security.access.Accessable;
+import 
org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.DefaultQueueRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class VirtualHost implements Accessable
+{
+    private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+
+
+    private final String _name;
+
+    private QueueRegistry _queueRegistry;
+
+    private ExchangeRegistry _exchangeRegistry;
+
+    private ExchangeFactory _exchangeFactory;
+
+    private MessageStore _messageStore;
+
+    protected VirtualHostMBean _virtualHostMBean;
+
+    private AMQBrokerManagerMBean _brokerMBean;
+
+    private AuthenticationManager _authenticationManager;
+
+    private AccessManager _accessManager;
+
+    private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", 
true);
+     
+    private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+    
+    public void setAccessableName(String name)
+    {
+        _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
+                     + name + ") ignored remains :" + getAccessableName());
+    }
+
+    public String getAccessableName()
+    {
+        return _name;
+    }
+
+
+    /**
+     * Abstract MBean class. This has some of the methods implemented from 
management intrerface for exchanges. Any
+     * implementaion of an Exchange MBean should extend this class.
+     */
+    public class VirtualHostMBean extends AMQManagedObject implements 
ManagedVirtualHost
+    {
+        public VirtualHostMBean() throws NotCompliantMBeanException
+        {
+            super(ManagedVirtualHost.class, "VirtualHost");
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _name.toString();
+        }
+
+        public String getName()
+        {
+            return _name.toString();
+        }
+
+        public VirtualHost getVirtualHost()
+        {
+            return VirtualHost.this;
+        }
+
+
+    } // End of MBean class
+
+    /**
+     * Used for testing only
+     * @param name
+     * @param store
+     * @throws Exception
+     */
+    public VirtualHost(String name, MessageStore store) throws Exception
+    {
+        this(name, null, store);
+    }
+
+    /**
+     * Normal Constructor
+     * @param name
+     * @param hostConfig
+     * @throws Exception
+     */
+    public VirtualHost(String name, Configuration hostConfig) throws Exception
+    {
+        this(name, hostConfig, null);
+    }
+
+    public VirtualHost(String name, Configuration hostConfig, MessageStore 
store) throws Exception
+    {
+        _name = name;
+
+        _virtualHostMBean = new VirtualHostMBean();
+        // This isn't needed to be registered
+        //_virtualHostMBean.register();
+
+        _queueRegistry = new DefaultQueueRegistry(this);
+        _exchangeFactory = new DefaultExchangeFactory(this);
+        _exchangeFactory.initialise(hostConfig);
+        _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+        if (store != null)
+        {
+            _messageStore = store;
+        }
+        else
+        {
+            if (hostConfig == null)
+            {
+                throw new IllegalAccessException("HostConfig and MessageStore 
cannot be null");
+            }
+            initialiseMessageStore(hostConfig);
+        }
+
+        _exchangeRegistry.initialise();
+
+        _authenticationManager = new 
PrincipalDatabaseAuthenticationManager(name, hostConfig);
+
+        _accessManager = new AccessManagerImpl(name, hostConfig);
+
+        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
+        _brokerMBean.register();
+        initialiseHouseKeeping(hostConfig);
+    }
+
+    private void initialiseHouseKeeping(final Configuration hostConfig)
+    {
+     
+       long period = 
hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", 
DEFAULT_HOUSEKEEPING_PERIOD);
+    
+       /* add a timer task to iterate over queues, cleaning expired messages 
from queues with no consumers */
+       if(period != 0L)
+       {
+               class RemoveExpiredMessagesTask extends TimerTask
+               {
+                       public void run()
+                       {
+                               for(AMQQueue q : _queueRegistry.getQueues())
+                               {
+
+                                       try
+                                       {
+                                               
q.removeExpiredIfNoSubscribers();
+                                       }
+                                       catch (AMQException e)
+                                       {
+                                               _logger.error("Exception in 
housekeeping for queue: " + q.getName().toString(),e);
+                                               throw new RuntimeException(e);
+                                       }
+                               }
+                       }
+               }
+               
+               _houseKeepingTimer.scheduleAtFixedRate(new 
RemoveExpiredMessagesTask(),
+                               period/2,
+                               period);
+       }
+    }
+    
+    private void initialiseMessageStore(Configuration config) throws Exception
+    {
+        String messageStoreClass = config.getString("store.class");
+
+        Class clazz = Class.forName(messageStoreClass);
+        Object o = clazz.newInstance();
+
+        if (!(o instanceof MessageStore))
+        {
+            throw new ClassCastException("Message store class must implement " 
+ MessageStore.class + ". Class " + clazz +
+                                         " does not.");
+        }
+        _messageStore = (MessageStore) o;
+        _messageStore.configure(this, "store", config);
+    }
+
+
+    public <T> T getConfiguredObject(Class<T> instanceType, Configuration 
config)
+    {
+        T instance;
+        try
+        {
+            instance = instanceType.newInstance();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Unable to instantiate configuration class " + 
instanceType + " - ensure it has a public default constructor");
+            throw new IllegalArgumentException("Unable to instantiate 
configuration class " + instanceType + " - ensure it has a public default 
constructor", e);
+        }
+        Configurator.configure(instance);
+
+        return instance;
+    }
+
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public QueueRegistry getQueueRegistry()
+    {
+        return _queueRegistry;
+    }
+
+    public ExchangeRegistry getExchangeRegistry()
+    {
+        return _exchangeRegistry;
+    }
+
+    public ExchangeFactory getExchangeFactory()
+    {
+        return _exchangeFactory;
+    }
+
+    public ApplicationRegistry getApplicationRegistry()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _messageStore;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public AccessManager getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public void close() throws Exception
+    {
+        if (_messageStore != null)
+        {
+            _messageStore.close();
+        }
+    }
+
+    public ManagedObject getBrokerMBean()
+    {
+        return _brokerMBean;
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _virtualHostMBean;
+    }
+}

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Jan 28 08:48:00 2008
@@ -39,6 +39,7 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
@@ -2147,6 +2148,70 @@
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), 
protocolHandler, nowait);
     }
+
+
+    /**
+     * Returns the number of messages currently queued for the given 
destination.
+     *
+     * <p/>Note that this operation automatically retries in the event of 
fail-over.
+     *
+     * @param amqd            The destination to be checked
+     *
+     * @return the number of queued messages.
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     */
+    public long getQueueDepth(final AMQDestination amqd)
+            throws AMQException
+    {
+
+        class QueueDeclareOkHandler extends SpecificMethodFrameListener
+        {
+
+            private long _messageCount;
+            private long _consumerCount;
+
+            public QueueDeclareOkHandler()
+            {
+                super(getChannelId(), QueueDeclareOkBody.class);
+            }
+
+            public boolean processMethod(int channelId, AMQMethodBody frame) 
//throws AMQException
+            {
+                boolean matches = super.processMethod(channelId, frame);
+                QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+                _messageCount = declareOk.getMessageCount();
+                _consumerCount = declareOk.getConsumerCount();
+                return matches;
+            }
+
+        }
+
+        return new FailoverNoopSupport<Long, AMQException>(
+                new FailoverProtectedOperation<Long, AMQException>()
+                {
+                    public Long execute() throws AMQException, 
FailoverException
+                    {
+
+                       AMQFrame queueDeclare =
+                               
getMethodRegistry().createQueueDeclareBody(getTicket(),
+                                                                               
                                   amqd.getAMQQueueName(),
+                                                                               
                                   true,
+                                                                               
                                   amqd.isDurable(),
+                                                                               
                                   amqd.isExclusive(),
+                                                                               
                                   amqd.isAutoDelete(),
+                                                                               
                                   false,
+                                                                               
                                   null).generateFrame(_channelId);
+                        QueueDeclareOkHandler okHandler = new 
QueueDeclareOkHandler();
+                        
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+                        return okHandler._messageCount;
+                    }
+                }, _connection).execute();
+
+    }
+
+
 
     /**
      * Declares the named exchange and type of exchange.

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Mon Jan 28 08:48:00 2008
@@ -283,7 +283,10 @@
     public static void killAllVMBrokers()
     {
         _logger.info("Killing all VM Brokers");
-        _acceptor.unbindAll();
+        if (_acceptor != null)
+        {
+               _acceptor.unbindAll();
+        }
         synchronized (_inVmPipeAddress)
         {
             _inVmPipeAddress.clear();

Modified: 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
 Mon Jan 28 08:48:00 2008
@@ -215,6 +215,14 @@
                 public void remove()
                 {
                     last.remove();
+                    if(last == _mainIterator)
+                    {
+                        _size.decrementAndGet();
+                    }
+                    else
+                    {
+                        _messageHeadSize.decrementAndGet();                    
    
+                    }
                 }
             };
     }

Modified: 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/failover/FailoverMethodTest.java
 Mon Jan 28 08:48:00 2008
@@ -1,19 +1,4 @@
-package org.apache.qpid.server.failover;
-
-import junit.framework.TestCase;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import java.util.concurrent.CountDownLatch;/*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -33,26 +18,51 @@
  * under the License.
  *
  */
+package org.apache.qpid.server.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import java.util.concurrent.CountDownLatch;
 
 public class FailoverMethodTest extends TestCase implements ExceptionListener
 {
-    private static final Logger _logger = 
LoggerFactory.getLogger(FailoverMethodTest.class);
     private CountDownLatch _failoverComplete = new CountDownLatch(1);
 
     public void setUp() throws AMQVMBrokerCreationException
     {
+        TransportConnection.createVMBroker(1);
     }
 
     public void tearDown() throws AMQVMBrokerCreationException
     {
+        TransportConnection.killAllVMBrokers();
     }
 
-    public void testFailoverRoundRobinDelay() throws URLSyntaxException, 
AMQVMBrokerCreationException, InterruptedException, JMSException
+    /**
+     * Test that the round robin method has the correct delays.
+     * The first connection to vm://:1 will work but the localhost connection 
should fail but the duration it takes
+     * to report the failure is what is being tested.
+     *
+     * @throws URLSyntaxException
+     * @throws InterruptedException
+     * @throws JMSException
+     */
+    public void testFailoverRoundRobinDelay() throws URLSyntaxException, 
InterruptedException, JMSException
     {
         String connectionString = 
"amqp://guest:guest@/test?brokerlist='vm://:1;tcp://localhost:5670?connectdelay='2000',retries='3''";
 
         AMQConnectionURL url = new AMQConnectionURL(connectionString);
-        TransportConnection.createVMBroker(1);
 
         try
         {
@@ -64,9 +74,15 @@
             TransportConnection.killAllVMBrokers();
 
             _failoverComplete.await();
+
             long end = System.currentTimeMillis();
 
-            assertTrue("Failover took at over 10seconds", (end - start) > 
6000);
+            //Failover should take less that 10 seconds.
+            // This is calculated by vm://:1 two retries left after initial 
connection (4s)
+            // localhost get three retries so (6s) so 10s in total for 
connection dropping
+            assertTrue("Failover took less than 9.5 seconds:"+(end - start), 
(end - start) > 9500);
+            // The sleep method is not 100% accurate under windows so with 5 
sleeps and a 10ms accuracy then there is
+            // the potential for the tests to finish in 500ms sooner than the 
predicted 10s.
 
         }
         catch (AMQException e)
@@ -80,7 +96,6 @@
         String connectionString = 
"amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='2000',retries='3''";
 
         AMQConnectionURL url = new AMQConnectionURL(connectionString);
-        TransportConnection.createVMBroker(1);
 
         try
         {
@@ -92,9 +107,16 @@
             TransportConnection.killAllVMBrokers();
 
             _failoverComplete.await();
+
             long end = System.currentTimeMillis();
 
-            assertTrue("Failover took at over 10seconds", (end - start) > 
6000);
+            //Failover should take less that 10 seconds.
+            // This is calculated by vm://:1 two retries left after initial 
connection
+            // so 4s in total for connection dropping
+
+            assertTrue("Failover took less than 3.7 seconds", (end - start) > 
3700);
+            // The sleep method is not 100% accurate under windows so with 3 
sleeps and a 10ms accuracy then there is
+            // the potential for the tests to finish in 300ms sooner than the 
predicted 4s.
 
         }
         catch (AMQException e)

Modified: 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=615943&r1=615942&r2=615943&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
 Mon Jan 28 08:48:00 2008
@@ -25,7 +25,11 @@
 import junit.framework.Assert;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import javax.jms.JMSException;
@@ -38,6 +42,7 @@
 import javax.jms.Message;
 import javax.naming.spi.InitialContextFactory;
 import javax.naming.Context;
+import javax.naming.NamingException;
 import java.util.Hashtable;
 
 
@@ -53,21 +58,37 @@
 
     private final long TIME_TO_LIVE = 1000L;
 
-    Context _context;
-
-    private Connection _clientConnection, _producerConnection;
-
-    private MessageConsumer _consumer;
-    MessageProducer _producer;
-    Session _clientSession, _producerSession;
     private static final int MSG_COUNT = 50;
+    private static final long SERVER_TTL_TIMEOUT = 60000L;
 
     protected void setUp() throws Exception
     {
-        if (BROKER.startsWith("vm://"))
+        super.setUp();
+
+        if (usingInVMBroker())
         {
             TransportConnection.createVMBroker(1);
         }
+
+
+    }
+
+    private boolean usingInVMBroker()
+    {
+        return BROKER.startsWith("vm://");
+    }
+
+    protected void tearDown() throws Exception
+    {
+        if (usingInVMBroker())
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+        super.tearDown();
+    }
+
+    public void testPassiveTTL() throws JMSException, NamingException
+    {
         InitialContextFactory factory = new 
PropertiesFileInitialContextFactory();
 
         Hashtable<String, String> env = new Hashtable<String, String>();
@@ -75,56 +96,40 @@
         env.put("connectionfactory.connection", "amqp://guest:[EMAIL 
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
         env.put("queue.queue", QUEUE);
 
-        _context = factory.getInitialContext(env);
+        Context context = factory.getInitialContext(env);
 
-        Queue queue = (Queue) _context.lookup("queue");
+        Queue queue = (Queue) context.lookup("queue");
 
         //Create Client 1
-        _clientConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
+        Connection clientConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
 
-        _clientSession = _clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Session clientSession = clientConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-        _consumer = _clientSession.createConsumer(queue);
+        MessageConsumer consumer = clientSession.createConsumer(queue);
 
         //Create Producer
-        _producerConnection = ((ConnectionFactory) 
_context.lookup("connection")).createConnection();
-
-        _producerConnection.start();
+        Connection producerConnection = ((ConnectionFactory) 
context.lookup("connection")).createConnection();
 
-        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        _producer = _producerSession.createProducer(queue);
-    }
+        producerConnection.start();
 
-    protected void tearDown() throws Exception
-    {
-        _clientConnection.close();
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-        _producerConnection.close();
-        super.tearDown();
-        
-        if (BROKER.startsWith("vm://"))
-        {
-            TransportConnection.killAllVMBrokers();
-        }
-    }
+        MessageProducer producer = producerSession.createProducer(queue);
 
-    public void test() throws JMSException
-    {
         //Set TTL
         int msg = 0;
-        _producer.send(nextMessage(String.valueOf(msg), true));
+        producer.send(nextMessage(String.valueOf(msg), true, producerSession, 
producer));
 
-        _producer.setTimeToLive(TIME_TO_LIVE);
+        producer.setTimeToLive(TIME_TO_LIVE);
 
         for (; msg < MSG_COUNT - 2; msg++)
         {
-            _producer.send(nextMessage(String.valueOf(msg), false));
+            producer.send(nextMessage(String.valueOf(msg), false, 
producerSession, producer));
         }
 
         //Reset TTL
-        _producer.setTimeToLive(0L);
-        _producer.send(nextMessage(String.valueOf(msg), false));
+        producer.setTimeToLive(0L);
+        producer.send(nextMessage(String.valueOf(msg), false, producerSession, 
producer));
 
          try
         {
@@ -136,31 +141,71 @@
 
         }
 
-        _clientConnection.start();
+        clientConnection.start();
 
         //Receive Message 0
-        Message received = _consumer.receive(100);
+        Message received = consumer.receive(100);
         Assert.assertNotNull("First message not received", received);
         Assert.assertTrue("First message doesn't have first set.", 
received.getBooleanProperty("first"));
         Assert.assertEquals("First message has incorrect TTL.", 0L, 
received.getLongProperty("TTL"));
 
 
-        received = _consumer.receive(100);
+        received = consumer.receive(100);
         Assert.assertNotNull("Final message not received", received);
         Assert.assertFalse("Final message has first set.", 
received.getBooleanProperty("first"));
         Assert.assertEquals("Final message has incorrect TTL.", 0L, 
received.getLongProperty("TTL"));
 
-        received = _consumer.receive(100);
+        received = consumer.receive(100);
         Assert.assertNull("More messages received", received);
+
+        clientConnection.close();
+
+        producerConnection.close();
     }
 
-    private Message nextMessage(String msg, boolean first) throws JMSException
+    private Message nextMessage(String msg, boolean first, Session 
producerSession, MessageProducer producer) throws JMSException
     {
-        Message send = _producerSession.createTextMessage("Message " + msg);
+        Message send = producerSession.createTextMessage("Message " + msg);
         send.setBooleanProperty("first", first);
-        send.setLongProperty("TTL", _producer.getTimeToLive());
+        send.setLongProperty("TTL", producer.getTimeToLive());
         return send;
     }
 
+
+    /**
+     * Tests the expired messages get actively deleted even on queues which 
have no consumers
+     */
+    public void testActiveTTL() throws URLSyntaxException, AMQException, 
JMSException, InterruptedException
+    {
+        Connection producerConnection = new 
AMQConnection(BROKER,"guest","guest","activeTTLtest","test");
+        AMQSession producerSession = (AMQSession) 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = producerSession.createTemporaryQueue();
+        producerSession.declareAndBind((AMQDestination) queue);
+        MessageProducer producer = producerSession.createProducer(queue);
+        producer.setTimeToLive(1000L);
+
+        // send Messages
+        for(int i = 0; i < MSG_COUNT; i++)
+        {
+            producer.send(producerSession.createTextMessage("Message: "+i));
+        }
+        long failureTime = System.currentTimeMillis() + 2*SERVER_TTL_TIMEOUT;
+
+        // check Queue depth for up to TIMEOUT seconds
+        long messageCount;
+
+        do
+        {
+            Thread.sleep(100);
+            messageCount = producerSession.getQueueDepth((AMQDestination) 
queue);
+        }
+        while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+        assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+    }
 
 }


Reply via email to