Author: aidan
Date: Thu Aug 21 06:53:28 2008
New Revision: 687764
URL: http://svn.apache.org/viewvc?rev=687764&view=rev
Log:
QPID-1167: reset queue notification lists when creating queues. Pull out
defaults centrally.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
Thu Aug 21 06:53:28 2008
@@ -183,13 +183,6 @@
_messageStore.createQueue(queue);
}
- Configuration virtualHostDefaultQueueConfiguration =
- VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue,
virtualHostDefaultQueueConfiguration);
- }
-
_queueRegistry.registerQueue(queue);
}
catch (AMQException ex)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Thu Aug 21 06:53:28 2008
@@ -130,13 +130,13 @@
}
}
- public static CompositeConfiguration getDefaultQueueConfiguration(AMQQueue
queue)
+ public static CompositeConfiguration
getDefaultQueueConfiguration(VirtualHost host)
{
CompositeConfiguration queueConfiguration = null;
if (_config == null)
return null;
- Configuration vHostConfiguration =
_config.subset(VIRTUALHOST_PROPERTY_BASE + queue.getVirtualHost().getName());
+ Configuration vHostConfiguration =
_config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName());
if (vHostConfiguration == null)
return null;
@@ -193,7 +193,10 @@
queue = AMQQueueFactory.createAMQQueueImpl(queueName,
durable,
owner == null ? null : new AMQShortString(owner) /*
These queues will have no owner */,
- autodelete /* Therefore autodelete makes no sence */,
virtualHost, arguments);
+ autodelete /* Therefore autodelete makes no sence */,
+ virtualHost,
+ arguments,
+ queueConfiguration);
if (queue.isDurable())
{
@@ -247,10 +250,6 @@
}
}
-
-
-
- Configurator.configure(queue, queueConfiguration);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Thu Aug 21 06:53:28 2008
@@ -203,13 +203,7 @@
}
});
}// if exclusive and not durable
-
- Configuration virtualHostDefaultQueueConfiguration =
VirtualHostConfiguration.getDefaultQueueConfiguration(queue);
- if (virtualHostDefaultQueueConfiguration != null)
- {
- Configurator.configure(queue,
virtualHostDefaultQueueConfiguration);
- }
-
+
return queue;
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Thu Aug 21 06:53:28 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.exchange.Exchange;
@@ -207,4 +209,6 @@
{
public void doTask(AMQQueue queue) throws AMQException;
}
+
+ void configure(Configuration virtualHostDefaultQueueConfiguration);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
Thu Aug 21 06:53:28 2008
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.Configuration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
@@ -31,22 +33,43 @@
public static final AMQShortString X_QPID_PRIORITIES = new
AMQShortString("x-qpid-priorities");
public static AMQQueue createAMQQueueImpl(AMQShortString name,
+ boolean durable,
+ AMQShortString owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, final FieldTable arguments)
+
+ throws AMQException
+ {
+
+ return createAMQQueueImpl(name, durable, owner, autoDelete,
+ virtualHost, arguments,
+
VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost));
+ }
+
+ public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
- VirtualHost virtualHost, final
FieldTable arguments)
+ VirtualHost virtualHost, final
FieldTable arguments,
+ Configuration queueConfiguration)
throws AMQException
{
final int priorities = arguments == null ? 1 :
arguments.containsKey(X_QPID_PRIORITIES) ?
arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ AMQQueue q = null;
if(priorities > 1)
{
- return new AMQPriorityQueue(name, durable, owner, autoDelete,
virtualHost, priorities);
+ q = new AMQPriorityQueue(name, durable, owner, autoDelete,
virtualHost, priorities);
}
else
{
- return new SimpleAMQQueue(name, durable, owner, autoDelete,
virtualHost);
+ q = new SimpleAMQQueue(name, durable, owner, autoDelete,
virtualHost);
+ }
+ if (q != null && queueConfiguration != null)
+ {
+ q.configure(queueConfiguration);
}
+ return q;
}
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Thu Aug 21 06:53:28 2008
@@ -3,6 +3,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MessageStore;
@@ -14,6 +15,7 @@
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.configuration.Configured;
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import javax.management.JMException;
@@ -160,12 +162,17 @@
throw new AMQException("AMQQueue MBean creation has failed ", e);
}
+ resetNotifications();
+
+ }
+
+ private void resetNotifications()
+ {
// This ensure that the notification checks for the configured alerts
are created.
setMaximumMessageAge(_maximumMessageAge);
setMaximumMessageCount(_maximumMessageCount);
setMaximumMessageSize(_maximumMessageSize);
setMaximumQueueDepth(_maximumQueueDepth);
-
}
// ------ Getters and Setters
@@ -1635,4 +1642,10 @@
}
return ids;
}
+
+ public void configure(Configuration queueConfiguration)
+ {
+ Configurator.configure(this, queueConfiguration);
+ resetNotifications();
+ }
}
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=687764&r1=687763&r2=687764&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Thu Aug 21 06:53:28 2008
@@ -40,6 +40,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
@@ -47,6 +48,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
+import java.util.Set;
/** This class tests all the alerts an AMQQueue can throw based on threshold
values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -251,6 +253,26 @@
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
+ public void testAlertConfiguration() throws AMQException
+ {
+ // Setup configuration
+ CompositeConfiguration config = new CompositeConfiguration();
+ config.setProperty("maximumMessageSize", new Long(23));
+ config.setProperty("maximumMessageCount", new Long(24));
+ config.setProperty("maximumQueueDepth", new Long(25));
+ config.setProperty("maximumMessageAge", new Long(26));
+
+ // Create queue and set config
+ _queue = getNewQueue();
+ _queue.configure(config);
+
+ // Check alerts and notifications
+ Set<NotificationCheck> checks = _queue.getNotificationChecks();
+ assertNotNull("No checks found", checks);
+ assertFalse("Checks should not be empty", checks.isEmpty());
+ assertEquals("Wrong number of checks", 4, checks.size());
+ }
+
protected IncomingMessage message(final boolean immediate, long size)
throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()