Author: bhupendrab
Date: Wed Mar  7 03:39:21 2007
New Revision: 515539

URL: http://svn.apache.org/viewvc?view=rev&rev=515539
Log:
1. Fixed the AMQQueueMBeanTest failures due to changes in 
AMQQueuMBean.getQueueDepth() from queueDepth/1000 to (queueDepth >> 10)
2.
Revision: 513748
Author: bhupendrab
Date: 13:26:51, 02 March 2007
Message:
QPID-390
Added test case for all the AMQQueue alerts
----
Modified : 
/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Added : 
/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java



Added:
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=515539&r1=515538&r2=515539
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Wed Mar  7 03:39:21 2007
@@ -81,8 +81,8 @@
     private final static String[] _msgContentAttributes = {"AMQ MessageId", 
"MimeType", "Encoding", "Content"};
     private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
 
-
     private final long[] _lastNotificationTimes = new 
long[NotificationCheck.values().length];
+    private Notification _lastNotification = null;
 
     @MBeanConstructor("Creates an MBean exposing an AMQQueue")
     public AMQQueueMBean(AMQQueue queue) throws JMException
@@ -256,11 +256,17 @@
     {
         // important : add log to the log file - monitoring tools may be 
looking for this
         _logger.info(notification.name() + " On Queue " + queue.getName() + " 
- " + notificationMsg);
-
-        Notification n = new 
Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+        notificationMsg = notification.name() + " " + notificationMsg;
+        
+        _lastNotification = new 
Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
                 ++_notificationSequenceNumber, System.currentTimeMillis(), 
notificationMsg);
 
-        _broadcaster.sendNotification(n);
+        _broadcaster.sendNotification(_lastNotification);
+    }
+
+    public Notification getLastNotification()
+    {
+        return _lastNotification;
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=515539&r1=515538&r2=515539
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Mar  7 03:39:21 2007
@@ -354,14 +354,9 @@
     public void removeAMessageFromTop(StoreContext storeContext) throws 
AMQException
     {
         _lock.lock();
-        AMQMessage msg = getNextMessage();
-        if (msg != null)
-        {
-            // mark this message as taken and get it removed
-            msg.taken(null);
-            _queue.dequeue(storeContext, msg);
-            getNextMessage();
-        }
+        
+        AMQMessage message = _messages.poll();
+        _totalMessageSize.addAndGet(-message.getSize());
 
         _lock.unlock();
     }

Added: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?view=auto&rev=515539
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Wed Mar  7 03:39:21 2007
@@ -0,0 +1,212 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import javax.management.Notification;
+import java.util.LinkedList;
+import java.util.HashSet;
+
+/** This class tests all the alerts an AMQQueue can throw based on threshold 
values of different parameters */
+public class AMQQueueAlertTest extends TestCase
+{
+    private final static int MAX_MESSAGE_COUNT = 50;
+    private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
+    private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
+    private final static long MAX_QUEUE_DEPTH = 10000;  // 10 KB
+    private AMQQueue _queue;
+    private AMQQueueMBean _queueMBean;
+    private VirtualHost _virtualHost;
+    private MessageStore _messageStore = new MemoryMessageStore();
+    private StoreContext _storeContext = new StoreContext();
+    private TransactionalContext _transactionalContext = new 
NonTransactionalContext(_messageStore, _storeContext,
+                                                                               
      null,
+                                                                               
      new LinkedList<RequiredDeliveryException>(),
+                                                                               
      new HashSet<Long>());
+
+    /**
+     * Tests if the alert gets thrown when message count increases the 
threshold limit
+     *
+     * @throws Exception
+     */
+    public void testMessageCountAlert() throws Exception
+    {
+        _queue = new AMQQueue(new AMQShortString("testQueue1"), false,  new 
AMQShortString("AMQueueAlertTest"),
+                              false, _virtualHost);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+
+        sendMessages(MAX_MESSAGE_COUNT, 256l);
+        assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        
assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_COUNT_ALERT.name()));
+    }
+
+    /**
+     * Tests if the Message Size alert gets thrown when message of higher than 
threshold limit is sent
+     *
+     * @throws Exception
+     */
+    public void testMessageSizeAlert() throws Exception
+    {
+        _queue = new AMQQueue(new AMQShortString("testQueue2"), false,  new 
AMQShortString("AMQueueAlertTest"),
+                              false, _virtualHost);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+        _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
+
+        sendMessages(1, MAX_MESSAGE_SIZE * 2);
+        assertTrue(_queueMBean.getMessageCount() == 1);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        
assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_SIZE_ALERT.name()));
+    }
+
+    /**
+     * Tests if Queue Depth alert is thrown when queue depth reaches the 
threshold value
+     *
+     * @throws Exception
+     */
+    public void testQueueDepthAlert() throws Exception
+    {
+        _queue = new AMQQueue(new AMQShortString("testQueue3"), false,  new 
AMQShortString("AMQueueAlertTest"),
+                              false, _virtualHost);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+        while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
+        {
+            sendMessages(1, MAX_MESSAGE_SIZE);
+        }
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        
assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+    }
+
+    /**
+     * Tests if MESSAGE AGE alert is thrown, when a message is in the queue 
for time higher than threshold value of
+     * message age
+     *
+     * @throws Exception
+     */
+    public void testMessageAgeAlert() throws Exception
+    {
+        _queue = new AMQQueue(new AMQShortString("testQueue4"), false,  new 
AMQShortString("AMQueueAlertTest"),
+                              false, _virtualHost);
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+        _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
+
+        sendMessages(1, MAX_MESSAGE_SIZE);
+
+        // Ensure message sits on queue long enough to age.
+        Thread.sleep(MAX_MESSAGE_AGE * 2);
+
+        sendMessages(1, MAX_MESSAGE_SIZE);
+        assertTrue(_queueMBean.getMessageCount() == 2);
+
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+
+        String notificationMsg = lastNotification.getMessage();
+        
assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
+    }
+
+    protected AMQMessage message(final boolean immediate, long size) throws 
AMQException
+    {
+        MessagePublishInfo publish = new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return null;
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return false;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return null;
+            }
+        };
+
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        contentHeaderBody.bodySize = size;   // in bytes
+        AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), 
publish, _transactionalContext);
+        message.setContentHeaderBody(contentHeaderBody);
+        return message;
+    }
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        IApplicationRegistry applicationRegistry = 
ApplicationRegistry.getInstance();
+        _virtualHost = 
applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+    }
+
+    private void sendMessages(int messageCount, long size) throws AMQException
+    {
+        AMQMessage[] messages = new AMQMessage[messageCount];
+        for (int i = 0; i < messages.length; i++)
+        {
+            messages[i] = message(false, size);
+            messages[i].enqueue(_queue);
+            messages[i].routingComplete(_messageStore, _storeContext, new 
MessageHandleFactory());
+        }
+
+        for (int i = 0; i < messageCount; i++)
+        {
+            _queue.process(_storeContext, messages[i], false);
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=515539&r1=515538&r2=515539
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Wed Mar  7 03:39:21 2007
@@ -42,6 +42,7 @@
  */
 public class AMQQueueMBeanTest extends TestCase
 {
+    private static long MESSAGE_SIZE = 1000;
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private QueueRegistry _queueRegistry;
@@ -61,7 +62,8 @@
         sendMessages(messageCount);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
-        assertTrue(_queueMBean.getQueueDepth() == 10);
+        long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+        assertTrue(_queueMBean.getQueueDepth() == queueDepth);
 
         _queueMBean.deleteMessageFromTop();
         assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
@@ -101,13 +103,14 @@
 
     public void testGeneralProperties()
     {
+        long maxQueueDepth = 1000; // in bytes
         _queueMBean.setMaximumMessageCount(50000);
         _queueMBean.setMaximumMessageSize(2000l);
-        _queueMBean.setMaximumQueueDepth(1000l);
+        _queueMBean.setMaximumQueueDepth(maxQueueDepth);
 
         assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
         assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
-        assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
+        assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 
10));
 
         assertTrue(_queueMBean.getName().equals("testQueue"));
         assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
@@ -150,8 +153,10 @@
         AMQMessage msg = message(false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
-        _queue.process(_storeContext, msg, false);
+
+        msg.enqueue(_queue);
         msg.routingComplete(_messageStore, _storeContext, new 
MessageHandleFactory());
+        _queue.process(_storeContext, msg, false);
         _queueMBean.viewMessageContent(id);
         try
         {
@@ -212,15 +217,12 @@
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false);
+            messages[i].enqueue(_queue);
+            messages[i].routingComplete(_messageStore, _storeContext, new 
MessageHandleFactory());
         }
         for (int i = 0; i < messageCount; i++)
         {
             _queue.process(_storeContext, messages[i], false);
-        }
-
-        for (int i = 0; i < messages.length; i++)
-        {
-            messages[i].routingComplete(_messageStore, _storeContext, new 
MessageHandleFactory());
         }
     }
 }


Reply via email to