Author: rgreig
Date: Mon Feb  5 01:40:04 2007
New Revision: 503604

URL: http://svn.apache.org/viewvc?view=rev&rev=503604
Log:
QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue 
notification, and log notifications in log file

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java

Modified: incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml Mon Feb  5 
01:40:04 2007
@@ -21,18 +21,79 @@
  -->
 <virtualhosts>
     <virtualhost>
-        <path>localhost</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+        <name>localhost</name>
+
+        <localhost>
+            <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+            <maximumMessageCount>5000</maximumMessageCount>
+            <queue>
+                <name>queue</name>
+                <queue>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb 
-->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb 
-->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                </queue>
+            </queue>
+            <queue>
+                <name>ping</name>
+                <ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb 
-->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb 
-->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                </ping>
+            </queue>
+        </localhost>
     </virtualhost>
        <virtualhost>
-        <path>development</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+        <name>development</name>
+        <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+        <maximumMessageCount>5000</maximumMessageCount>
+        <development>
+            <queue>
+                <name>queue</name>
+                <queue>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb 
-->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb 
-->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                </queue>
+            </queue>
+            <queue>
+                <name>ping</name>
+                <ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb 
-->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb 
-->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                </ping>
+            </queue>
+        </development>
     </virtualhost>
                <virtualhost>
-        <path>test</path>
-        <bind>direct://amq.direct//queue</bind>
-        <bind>direct://amq.direct//ping</bind>
+            <name>test</name>
+            <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+            <maximumMessageCount>5000</maximumMessageCount>
+            <test>
+                <queue>
+                    <name>queue</name>
+                    <queue>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 
4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 
2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                    </queue>
+                </queue>
+                <queue>
+                    <name>ping</name>
+                    <ping>
+                        <exchange>amq.direct</exchange>
+                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 
4Mb -->
+                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 
2Mb -->
+                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 
mins -->
+                    </ping>
+                </queue>
+            </test>
     </virtualhost>
 </virtualhosts>

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
 Mon Feb  5 01:40:04 2007
@@ -34,9 +34,13 @@
 import org.apache.log4j.Logger;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
 
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
 
 public class VirtualHostConfiguration
 {
@@ -44,11 +48,7 @@
 
     XMLConfiguration _config;
 
-    private static final String XML_VIRTUALHOST = "virtualhost";
-    private static final String XML_PATH = "path";
-    private static final String XML_BIND = "bind";
-    private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
-    private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+    private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
 
 
     public VirtualHostConfiguration(String configFile) throws 
ConfigurationException
@@ -57,137 +57,66 @@
 
         _config = new XMLConfiguration(configFile);
 
-        if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
-        {
-            throw new ConfigurationException(
-                    "Virtualhost Configuration document does not contain a 
valid virtualhost.");
-        }
     }
 
-    public void performBindings() throws AMQException, ConfigurationException, 
URLSyntaxException
-    {
-        Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
-
-        if (prop instanceof Collection)
-        {
-            _logger.debug("Number of VirtualHosts: " + ((Collection) 
prop).size());
 
-            int virtualhosts = ((Collection) prop).size();
-            for (int vhost = 0; vhost < virtualhosts; vhost++)
-            {
-                loadVirtualHost(vhost);
-            }
-        }
-        else
-        {
-            loadVirtualHost(-1);
-        }
-    }
 
-    private void loadVirtualHost(int index) throws AMQException, 
ConfigurationException, URLSyntaxException
+    private void configureVirtualHost(String virtualHostName, Configuration 
configuration) throws ConfigurationException, AMQException
     {
-        String path = XML_VIRTUALHOST;
-
-        if (index != -1)
-        {
-            path = path + "(" + index + ")";
-        }
-
-        Object prop = _config.getProperty(path + "." + XML_PATH);
-
-        if (prop == null)
-        {
-            prop = _config.getProperty(path + "." + XML_BIND);
-            String error = "Virtual Host not defined for binding";
-
-            if (prop != null)
-            {
-                if (prop instanceof Collection)
-                {
-                    error += "s";
-                }
+        _logger.debug("Loding configuration for virtaulhost: 
"+virtualHostName);
 
-                error += ": " + prop;
-            }
 
-            throw new ConfigurationException(error);
-        }
+        VirtualHost virtualHost = 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
 
-        _logger.info("VirtualHost:'" + prop + "'");
-        String virtualHost = prop.toString();
 
-        prop = _config.getProperty(path + "." + XML_BIND);
-        if (prop instanceof Collection)
-        {
-            int bindings = ((Collection) prop).size();
-            _logger.debug("Number of Bindings: " + bindings);
-            for (int dest = 0; dest < bindings; dest++)
-            {
-                loadBinding(virtualHost, path, dest);
-            }
-        }
-        else
-        {
-            loadBinding(virtualHost,path, -1);
-        }
-    }
 
-    private void loadBinding(String virtualHost, String rootpath, int index) 
throws AMQException, ConfigurationException, URLSyntaxException
-    {
-        String path = rootpath + "." + XML_BIND;
-        if (index != -1)
+        if(virtualHost == null)
         {
-            path = path + "(" + index + ")";
+            throw new ConfigurationException("Unknown virtual host: " + 
virtualHostName);
         }
 
-        String bindingString = _config.getString(path);
-
-        AMQBindingURL binding = new AMQBindingURL(bindingString);
+        List queueNames = configuration.getList("queue.name");
 
-        _logger.debug("Loaded Binding:" + binding);
-
-        try
-        {
-            bind(virtualHost, binding);
-        }
-        catch (AMQException amqe)
+        for(Object queueNameObj : queueNames)
         {
-            _logger.info("Unable to bind url: " + binding);
-            throw amqe;
+            String queueName = String.valueOf(queueNameObj);
+            configureQueue(virtualHost, queueName, configuration);
         }
+
     }
 
-    private void bind(String virtualHostName, AMQBindingURL binding) throws 
AMQException, ConfigurationException
+    private void configureQueue(VirtualHost virtualHost, String 
queueNameString, Configuration configuration) throws AMQException, 
ConfigurationException
     {
+        CompositeConfiguration queueConfiguration = new 
CompositeConfiguration();
 
-        AMQShortString queueName = binding.getQueueName();
+        queueConfiguration.addConfiguration(configuration.subset("queue."+ 
queueNameString));
+        queueConfiguration.addConfiguration(configuration);
 
-        // This will occur if the URL is a Topic
-        if (queueName == null)
-        {
-            //todo register valid topic
-            ///queueName = binding.getDestinationName();
-            throw new AMQException("Topics cannot be bound. TODO Register 
valid topic");
-        }
-
-        //Get references to Broker Registries
-        VirtualHost virtualHost = 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName);
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MessageStore messageStore = virtualHost.getMessageStore();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
 
+
+        AMQShortString queueName = new AMQShortString(queueNameString);
+
+        AMQQueue queue;
+
         synchronized (queueRegistry)
         {
-            AMQQueue queue = queueRegistry.getQueue(queueName);
+            queue = queueRegistry.getQueue(queueName);
 
             if (queue == null)
             {
-                _logger.info("Queue '" + binding.getQueueName() + "' does not 
exists. Creating.");
+                _logger.info("Creating queue '" + queueName + "' on virtual 
host " + virtualHost.getName());
+
+                boolean durable = queueConfiguration.getBoolean("durable" 
,false);
+                boolean autodelete = 
queueConfiguration.getBoolean("autodelete", false);
+                String owner = queueConfiguration.getString("owner", null);
 
                 queue = new AMQQueue(queueName,
-                        
Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
-                        null /* These queues will have no owner */,
-                        false /* Therefore autodelete makes no sence */, 
virtualHost);
+                        durable,
+                        owner == null ? null : new AMQShortString(owner) /* 
These queues will have no owner */,
+                        autodelete /* Therefore autodelete makes no sence */, 
virtualHost);
 
                 if (queue.isDurable())
                 {
@@ -198,27 +127,69 @@
             }
             else
             {
-                _logger.info("Queue '" + binding.getQueueName() + "' already 
exists not creating.");
+                _logger.info("Queue '" + queueNameString + "' already exists 
on virtual host "+virtualHost.getName()+", not creating.");
             }
 
-            Exchange defaultExchange = 
exchangeRegistry.getExchange(binding.getExchangeName());
-            synchronized (defaultExchange)
+            String exchangeName = queueConfiguration.getString("exchange", 
null);
+
+            Exchange exchange = exchangeRegistry.getExchange(exchangeName == 
null ? null : new AMQShortString(exchangeName));
+
+            if(exchange == null)
             {
-                if (defaultExchange == null)
-                {
-                    throw new ConfigurationException("Attempt to bind queue to 
unknown exchange:" + binding);
-                }
+                exchange = 
virtualHost.getExchangeRegistry().getDefaultExchange();
+            }
 
-                defaultExchange.registerQueue(queue.getName(), queue, null);
+            if (exchange == null)
+            {
+                throw new ConfigurationException("Attempt to bind queue to 
unknown exchange:" + exchangeName);
+            }
 
-                if (binding.getRoutingKey() == null || 
binding.getRoutingKey().equals(""))
+            synchronized (exchange)
+            {
+                List routingKeys = queueConfiguration.getList("routingKey");
+                if(routingKeys == null || routingKeys.isEmpty())
                 {
-                    throw new ConfigurationException("Unknown binding not 
specified on url:" + binding);
+                    routingKeys = Collections.singletonList(queue.getName());
                 }
 
-                queue.bind(binding.getRoutingKey(), defaultExchange);
+                for(Object routingKeyNameObj : routingKeys)
+                {
+                    AMQShortString routingKey = new 
AMQShortString(String.valueOf(routingKeyNameObj));
+                    exchange.registerQueue(routingKey, queue, null);
+
+                    queue.bind(routingKey, exchange);
+
+                    _logger.info("Queue '" + queue.getName() + "' bound to 
exchange:" + exchangeName + " RK:'" + routingKey + "'");
+                }
             }
-            _logger.info("Queue '" + queue.getName() + "' bound to exchange:" 
+ binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
         }
+
+
+
+        Configurator.configure(queue, queueConfiguration);
     }
+
+
+    public void performBindings() throws AMQException, ConfigurationException
+    {
+        List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + 
"name");
+
+        _logger.info("Configuring " + virtualHostNames == null ? 0 : 
virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+        for(Object nameObject : virtualHostNames)
+        {
+            String name = String.valueOf(nameObject);
+            configureVirtualHost(name, 
_config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+        }
+
+        if (virtualHostNames == null || virtualHostNames.isEmpty())
+        {
+            throw new ConfigurationException(
+                    "Virtualhost Configuration document does not contain a 
valid virtualhost.");
+        }
+    }
+
+
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
 Mon Feb  5 01:40:04 2007
@@ -67,6 +67,11 @@
         _defaultExchange = exchange;
     }
 
+    public Exchange getDefaultExchange()
+    {
+        return _defaultExchange;
+    }
+
     public void unregisterExchange(AMQShortString name, boolean inUse) throws 
AMQException
     {
         // TODO: check inUse argument

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
 Mon Feb  5 01:40:04 2007
@@ -40,4 +40,6 @@
     Exchange getExchange(AMQShortString name);
 
     void setDefaultExchange(Exchange exchange);
+
+    Exchange getDefaultExchange();
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Mon Feb  5 01:40:04 2007
@@ -509,6 +509,12 @@
         _messageHandle.setRedelivered(redelivered);
     }
 
+    public long getArrivalTime()
+    {
+        return _messageHandle.getArrivalTime();
+    }
+
+
     /**
      * Called when this message is delivered to a consumer. (used to
      * implement the 'immediate' flag functionality).

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
 Mon Feb  5 01:40:04 2007
@@ -74,4 +74,6 @@
     void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) 
throws AMQException;
 
     void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) 
throws AMQException;
+
+    long getArrivalTime();
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Mon Feb  5 01:40:04 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,6 @@
 import javax.management.JMException;
 import java.text.MessageFormat;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -130,22 +130,36 @@
     /**
      * max allowed size(KB) of a single message
      */
-    private long _maxMessageSize = 10000;
+    private long _maximumMessageSize = 10000;
 
     /**
      * max allowed number of messages on a queue.
      */
-    private Integer _maxMessageCount = 10000;
+    @Configured(path = "maximumMessageCount", defaultValue = "0")
+    public int _maximumMessageCount;
 
     /**
-     * max queue depth(KB) for the queue
+     * max queue depth for the queue
      */
-    private long _maxQueueDepth = 10000000;
+    @Configured(path = "maximumQueueDepth", defaultValue = "0")
+    public long _maximumQueueDepth = 10000000;
+
+/*
+     * maximum message age before alerts occur
+     */
+    @Configured(path = "maximumMessageAge", defaultValue = "0")
+    public long _maximumMessageAge = 30000; //0
+
+    /*
+     * the minimum interval between sending out consequetive alerts of the 
same type
+     */
+    @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+    public long _minimumAlertRepeatGap = 30000;
 
     /**
      * total messages received by the queue since startup.
      */
-    private long _totalMessagesReceived = 0;
+    public long _totalMessagesReceived = 0;
 
     public int compareTo(Object o)
     {
@@ -286,50 +300,56 @@
         return _managedObject;
     }
 
-    public Long getMaximumMessageSize()
+    public long getMaximumMessageSize()
     {
-        return _maxMessageSize;
+        return _maximumMessageSize;
     }
 
-    public void setMaximumMessageSize(Long value)
+    public void setMaximumMessageSize(long value)
     {
-        _maxMessageSize = value;
+        _maximumMessageSize = value;
     }
 
-    public Integer getConsumerCount()
+    public int getConsumerCount()
     {
         return _subscribers.size();
     }
 
-    public Integer getActiveConsumerCount()
+    public int getActiveConsumerCount()
     {
         return _subscribers.getWeight();
     }
 
-    public Long getReceivedMessageCount()
+    public long getReceivedMessageCount()
     {
         return _totalMessagesReceived;
     }
 
-    public Integer getMaximumMessageCount()
+    public int getMaximumMessageCount()
     {
-        return _maxMessageCount;
+        return _maximumMessageCount;
     }
 
-    public void setMaximumMessageCount(Integer value)
+    public void setMaximumMessageCount(int value)
     {
-        _maxMessageCount = value;
+        _maximumMessageCount = value;
     }
 
     public long getMaximumQueueDepth()
     {
-        return _maxQueueDepth;
+        return _maximumQueueDepth;
     }
 
     // Sets the queue depth, the max queue size
     public void setMaximumQueueDepth(long value)
     {
-        _maxQueueDepth = value;
+        _maximumQueueDepth = value;
+    }
+
+    public long getOldestMessageArrivalTime()
+    {
+        return _deliveryMgr.getOldestMessageArrival();
+        
     }
 
     /**
@@ -631,6 +651,24 @@
         _deleteTaskList.add(task);
     }
 
+    public long getMinimumAlertRepeatGap()
+    {
+        return _minimumAlertRepeatGap;
+    }
 
+    public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+    {
+        _minimumAlertRepeatGap = minimumAlertRepeatGap;
+    }
+
+    public long getMaximumMessageAge()
+    {
+        return _maximumMessageAge;
+    }
+
+    public void setMaximumMessageAge(long maximumMessageAge)
+    {
+        _maximumMessageAge = maximumMessageAge;
+    }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Mon Feb  5 01:40:04 2007
@@ -22,12 +22,12 @@
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.Main;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
 
 import javax.management.openmbean.*;
 import javax.management.*;
@@ -41,8 +41,11 @@
  * for an AMQQueue.
  */
 @MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, 
QueueNotificationListener
 {
+
+    private static final Logger _logger = 
Logger.getLogger(AMQQueueMBean.class);
+
     /**
      * Since the MBean is not associated with a real channel we can safely 
create our own store context
      * for use in the few methods that require one.
@@ -63,6 +66,9 @@
     private final static String[] _msgContentAttributes = {"AMQ MessageId", 
"MimeType", "Encoding", "Content"};
     private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
 
+
+    private final long[] _lastNotificationTimes = new 
long[NotificationCheck.values().length];
+
     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
     public AMQQueueMBean(AMQQueue queue) throws JMException
     {
@@ -213,38 +219,38 @@
         return msg.getContentHeaderBody().bodySize;
     }
 
+
+
     /**
      * Checks if there is any notification to be send to the listeners
      */
     public void checkForNotification(AMQMessage msg) throws AMQException, 
JMException
     {
-        // Check for threshold message count
-        Integer msgCount = getMessageCount();
-        if (msgCount >= getMaximumMessageCount())
-        {
-            notifyClients("Message count(" + msgCount + ") has reached or 
exceeded the threshold high value");
-        }
 
-        // Check for threshold message size
-        long messageSize = getMessageSize(msg);
-        if (messageSize >= _queue.getMaximumMessageSize())
-        {
-            notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" 
+ messageSize + " bytes) is higher than the threshold value");
-        }
+        final long currentTime = System.currentTimeMillis();
+        final long thresholdTime =  currentTime - 
_queue.getMinimumAlertRepeatGap();
 
-        // Check for threshold queue depth in bytes
-        long queueDepth = getQueueDepthKb();
-        if (queueDepth >= _queue.getMaximumQueueDepth())
+        for(NotificationCheck check : NotificationCheck.values())
         {
-            notifyClients("Queue depth(" + queueDepth + "), Queue size has 
reached the threshold high value");
+            if(check.isMessageSpecific() || 
_lastNotificationTimes[check.ordinal()]<thresholdTime)
+            {
+                if(check.notifyIfNecessary(msg, _queue, this))
+                {
+                    _lastNotificationTimes[check.ordinal()] = currentTime;
+                }
+            }
         }
+
     }
 
     /**
      * Sends the notification to the listeners
      */
-    private void notifyClients(String notificationMsg)
+    public void notifyClients(NotificationCheck notification, AMQQueue queue, 
String notificationMsg)
     {
+        // important : add log to the log file - monitoring tools may be 
looking for this
+        _logger.info(notification.name() + " On Queue " + queue.getName() + " 
- " + notificationMsg);
+
         Notification n = new 
Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
                 ++_notificationSequenceNumber, System.currentTimeMillis(), 
notificationMsg);
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Mon Feb  5 01:40:04 2007
@@ -160,6 +160,12 @@
         return _totalMessageSize.get();
     }
 
+    public long getOldestMessageArrival()
+    {
+        AMQMessage msg = _messages.peek();
+        return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+    }
+
 
     public synchronized List<AMQMessage> getMessages()
     {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
 Mon Feb  5 01:40:04 2007
@@ -83,4 +83,6 @@
     boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean 
acks) throws AMQException;
 
     long getTotalMessageSize();
+
+    long getOldestMessageArrival();
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 Mon Feb  5 01:40:04 2007
@@ -43,6 +43,8 @@
 
     private boolean _redelivered;
 
+    private long _arrivalTime;
+
     public InMemoryMessageHandle()
     {
     }
@@ -114,6 +116,7 @@
     {
         _publishBody = publishBody;
         _contentHeaderBody = contentHeaderBody;
+        _arrivalTime = System.currentTimeMillis();
     }
 
     public void removeMessage(StoreContext storeContext, Long messageId) 
throws AMQException
@@ -130,4 +133,10 @@
     {
         // NO OP
     }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
 Mon Feb  5 01:40:04 2007
@@ -31,11 +31,19 @@
 
     private int _contentChunkCount;
 
+    private long _arrivalTime;
+
     public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody 
contentHeaderBody, int contentChunkCount)
     {
+        this(publishBody,contentHeaderBody, contentChunkCount, 
System.currentTimeMillis());
+    }
+
+    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody 
contentHeaderBody, int contentChunkCount, long arrivalTime)
+    {
         _contentHeaderBody = contentHeaderBody;
         _publishBody = publishBody;
         _contentChunkCount = contentChunkCount;
+        _arrivalTime = arrivalTime;
     }
 
     public int getContentChunkCount()
@@ -66,5 +74,15 @@
     public void setPublishBody(BasicPublishBody publishBody)
     {
         _publishBody = publishBody;
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
+    public void setArrivalTime(long arrivalTime)
+    {
+        _arrivalTime = arrivalTime;
     }
 }

Added: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?view=auto&rev=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
 Mon Feb  5 01:40:04 2007
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+    MESSAGE_COUNT_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, 
QueueNotificationListener listener)
+        {
+            int msgCount = queue.getMessageCount();
+            final int maximumMessageCount = queue.getMaximumMessageCount();
+            if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+            {
+                listener.notifyClients(this, queue, msgCount + ": Maximum 
count on queue threshold ("+ maximumMessageCount +") breached.");
+                return true;
+            }
+            return false;
+        }
+    },
+    MESSAGE_SIZE_ALERT(true)
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, 
QueueNotificationListener listener)
+        {
+            final long maximumMessageSize = queue.getMaximumMessageSize();
+            if(maximumMessageSize != 0)
+            {
+                // Check for threshold message size
+                long messageSize;
+                try
+                {
+                    messageSize = (msg == null) ? 0 : 
msg.getContentHeaderBody().bodySize;
+                }
+                catch (AMQException e)
+                {
+                    messageSize = 0;
+                }
+
+
+                if (messageSize >= maximumMessageSize)
+                {
+                    listener.notifyClients(this, queue, messageSize + "b : 
Maximum message size threshold ("+ maximumMessageSize +") breached. [Message 
ID=" + msg.getMessageId() + "]");
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    QUEUE_DEPTH_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, 
QueueNotificationListener listener)
+        {
+            // Check for threshold queue depth in bytes
+            final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+            if(maximumQueueDepth != 0)
+            {
+                final long queueDepth = queue.getQueueDepth();
+
+                if (queueDepth >= maximumQueueDepth)
+                {
+                    listener.notifyClients(this, queue, (queueDepth>>10) + "Kb 
: Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    MESSAGE_AGE_ALERT
+    {
+        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, 
QueueNotificationListener listener)
+        {
+
+            final long maxMessageAge = queue.getMaximumMessageAge();
+            if(maxMessageAge != 0)
+            {
+                final long currentTime = System.currentTimeMillis();
+                final long thresholdTime = currentTime - maxMessageAge;
+                final long firstArrivalTime = 
queue.getOldestMessageArrivalTime();
+
+                if(firstArrivalTime < thresholdTime)
+                {
+                    long oldestAge = currentTime - firstArrivalTime;
+                    listener.notifyClients(this, queue, (oldestAge/1000) + "s 
: Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+                    return true;
+                }
+            }
+            return false;
+                    
+        }
+
+    }
+    ;
+
+    private final boolean _messageSpecific;
+
+    NotificationCheck()
+    {
+        this(false);
+    }
+
+    NotificationCheck(boolean messageSpecific)
+    {
+        _messageSpecific = messageSpecific;
+    }
+
+    public boolean isMessageSpecific()
+    {
+        return _messageSpecific;
+    }
+
+    abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, 
QueueNotificationListener listener);
+
+}

Added: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java?view=auto&rev=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
 Mon Feb  5 01:40:04 2007
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+    void notifyClients(NotificationCheck notification, AMQQueue queue, String 
notificationMsg);
+}

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=503604&r1=503603&r2=503604
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Mon Feb  5 01:40:04 2007
@@ -49,6 +49,8 @@
 
     private final MessageStore _messageStore;
 
+    private long _arrivalTime;
+
 
     public WeakReferenceMessageHandle(MessageStore messageStore)
     {
@@ -60,14 +62,27 @@
         ContentHeaderBody chb = (_contentHeaderBody != null ? 
_contentHeaderBody.get() : null);
         if (chb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(messageId);
             chb = mmd.getContentHeaderBody();
-            _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
-            _publishBody = new 
WeakReference<BasicPublishBody>(mmd.getPublishBody());
         }
         return chb;
     }
 
+    private MessageMetaData loadMessageMetaData(Long messageId)
+            throws AMQException
+    {
+        MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+        populateFromMessageMetaData(mmd);
+        return mmd;
+    }
+
+    private void populateFromMessageMetaData(MessageMetaData mmd)
+    {
+        _arrivalTime = mmd.getArrivalTime();
+        _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+        _publishBody = new 
WeakReference<BasicPublishBody>(mmd.getPublishBody());
+    }
+
     public int getBodyCount(Long messageId) throws AMQException
     {
         if (_contentBodies == null)
@@ -136,10 +151,9 @@
         BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : 
null);
         if (bpb == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            MessageMetaData mmd = loadMessageMetaData(messageId);
+
             bpb = mmd.getPublishBody();
-            _publishBody = new WeakReference<BasicPublishBody>(bpb);
-            _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
         }
         return bpb;
     }
@@ -179,10 +193,15 @@
         {
             _contentBodies = new LinkedList<WeakReference<ContentBody>>();
         }
-        _messageStore.storeMessageMetaData(storeContext, messageId, new 
MessageMetaData(publishBody, contentHeaderBody,
-                                                                               
         _contentBodies.size()));
-        _publishBody = new WeakReference<BasicPublishBody>(publishBody);
-        _contentHeaderBody = new 
WeakReference<ContentHeaderBody>(contentHeaderBody);
+
+        final long arrivalTime = System.currentTimeMillis();
+
+
+        MessageMetaData mmd = new MessageMetaData(publishBody, 
contentHeaderBody, _contentBodies.size(), arrivalTime);
+
+        _messageStore.storeMessageMetaData(storeContext, messageId, mmd);
+
+        populateFromMessageMetaData(mmd);
     }
 
     public void removeMessage(StoreContext storeContext, Long messageId) 
throws AMQException
@@ -199,4 +218,10 @@
     {
         _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
     }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
 }


Reply via email to