Author: bhupendrab
Date: Tue Mar 13 06:11:31 2007
New Revision: 517678

URL: http://svn.apache.org/viewvc?view=rev&rev=517678
Log:
QPID-408 Queue Depth should be reduced when message is polled from the queue.

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
   (with props)
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java

Added: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java?view=auto&rev=517678
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
 Tue Mar 13 06:11:31 2007
@@ -0,0 +1,295 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods 
not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please 
update those.
+ */
+public class TestIoSession implements IoSession
+{
+    private final ConcurrentMap attributes = new ConcurrentHashMap();
+
+    public TestIoSession()
+    {
+    }
+
+    public IoService getService()
+    {
+        return null;
+    }
+
+    public IoServiceConfig getServiceConfig()
+    {
+        return new TestIoConfig();
+    }
+
+    public IoHandler getHandler()
+    {
+        return null;
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return null;
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return null;
+    }
+
+    public WriteFuture write(Object message)
+    {
+        return null;
+    }
+
+    public CloseFuture close()
+    {
+        return null;
+    }
+
+    public Object getAttachment()
+    {
+        return getAttribute("");
+    }
+
+    public Object setAttachment(Object attachment)
+    {
+        return setAttribute("",attachment);
+    }
+
+    public Object getAttribute(String key)
+    {
+        return attributes.get(key);
+    }
+
+    public Object setAttribute(String key, Object value)
+    {
+        return attributes.put(key,value);
+    }
+
+    public Object setAttribute(String key)
+    {
+        return attributes.put(key, Boolean.TRUE);
+    }
+
+    public Object removeAttribute(String key)
+    {
+        return attributes.remove(key);
+    }
+
+    public boolean containsAttribute(String key)
+    {
+        return attributes.containsKey(key);
+    }
+
+    public Set getAttributeKeys()
+    {
+        return attributes.keySet();
+    }
+
+    public TransportType getTransportType()
+    {
+        return null;
+    }
+
+    public boolean isConnected()
+    {
+        return false;
+    }
+
+    public boolean isClosing()
+    {
+        return false;
+    }
+
+    public CloseFuture getCloseFuture()
+    {
+        return null;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return new InetSocketAddress("127.0.0.1", 1234);
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return null;
+    }
+
+    public SocketAddress getServiceAddress()
+    {
+        return null;
+    }
+
+    public int getIdleTime(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public long getIdleTimeInMillis(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public void setIdleTime(IdleStatus status, int idleTime)
+    {
+
+    }
+
+    public int getWriteTimeout()
+    {
+        return 0;
+    }
+
+    public long getWriteTimeoutInMillis()
+    {
+        return 0;
+    }
+
+    public void setWriteTimeout(int writeTimeout)
+    {
+
+    }
+
+    public TrafficMask getTrafficMask()
+    {
+        return null;
+    }
+
+    public void setTrafficMask(TrafficMask trafficMask)
+    {
+
+    }
+
+    public void suspendRead()
+    {
+
+    }
+
+    public void suspendWrite()
+    {
+
+    }
+
+    public void resumeRead()
+    {
+
+    }
+
+    public void resumeWrite()
+    {
+
+    }
+
+    public long getReadBytes()
+    {
+        return 0;
+    }
+
+    public long getWrittenBytes()
+    {
+        return 0;
+    }
+
+    public long getReadMessages()
+    {
+        return 0;
+    }
+
+    public long getWrittenMessages()
+    {
+        return 0;
+    }
+
+    public long getWrittenWriteRequests()
+    {
+        return 0;
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        return 0;
+    }
+
+    public int getScheduledWriteBytes()
+    {
+        return 0;
+    }
+
+    public long getCreationTime()
+    {
+        return 0;
+    }
+
+    public long getLastIoTime()
+    {
+        return 0;
+    }
+
+    public long getLastReadTime()
+    {
+        return 0;
+    }
+
+    public long getLastWriteTime()
+    {
+        return 0;
+    }
+
+    public boolean isIdle(IdleStatus status)
+    {
+        return false;
+    }
+
+    public int getIdleCount(IdleStatus status)
+    {
+        return 0;
+    }
+
+    public long getLastIdleTime(IdleStatus status)
+    {
+        return 0; 
+    }
+
+    /**
+     * Test implementation of IoServiceConfig
+     */
+    private class TestIoConfig extends SocketAcceptorConfig
+    {
+        public ThreadModel getThreadModel()
+        {
+            return ReadWriteThreadModel.getInstance();
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java?view=auto&rev=517678
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
 Tue Mar 13 06:11:31 2007
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
+
+public class TestMinaProtocolSession extends AMQMinaProtocolSession
+{
+    public TestMinaProtocolSession() throws AMQException
+    {
+        super(new TestIoSession(),
+              ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+              new AMQCodecFactory(true));
+    }
+
+    public ProtocolOutputConverter getProtocolOutputConverter()
+    {
+        return ProtocolOutputConverterRegistry.getConverter(this);
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return (byte)8;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return (byte)0;
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?view=diff&rev=517678&r1=517677&r2=517678
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Tue Mar 13 06:11:31 2007
@@ -28,6 +28,9 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -46,6 +49,7 @@
     private AMQQueue _queue;
     private AMQQueueMBean _queueMBean;
     private VirtualHost _virtualHost;
+    private AMQMinaProtocolSession protocolSession = null;
     private MessageStore _messageStore = new MemoryMessageStore();
     private StoreContext _storeContext = new StoreContext();
     private TransactionalContext _transactionalContext = new 
NonTransactionalContext(_messageStore, _storeContext,
@@ -104,7 +108,7 @@
      *
      * @throws Exception
      */
-    public void testQueueDepthAlert() throws Exception
+    public void testQueueDepthAlertNoSubscriber() throws Exception
     {
         _queue = new AMQQueue(new AMQShortString("testQueue3"), false,  new 
AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost);
@@ -153,6 +157,70 @@
         
assertTrue(notificationMsg.startsWith(NotificationCheck.MESSAGE_AGE_ALERT.name()));
     }
 
+    /*
+     This test sends some messages to the queue with subscribers needing 
message to be acknowledged.
+     The messages will not be acknowledged and will be required twice. Why we 
are checking this is because
+     the bug reported said that the queueDepth keeps increasing when messages 
are requeued.
+     The QueueDepth should decrease when messages are delivered from the queue 
(QPID-408)
+    */
+    public void testQueueDepthAlertWithSubscribers() throws Exception
+    {
+        protocolSession = new TestMinaProtocolSession();
+        AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, 
null);
+        protocolSession.addChannel(channel);
+
+        // Create queue
+        _queue = getNewQueue();
+        _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
+                                       new AMQShortString("consumer_tag"), 
true, null, false, false);
+        
+        _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+        _queueMBean.setMaximumMessageCount(9999);   // Set a high value, 
because this is not being tested
+        _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+        // Send messages(no of message to be little more than what can cause a 
Queue_Depth alert)
+        int messageCount = Math.round(MAX_QUEUE_DEPTH/MAX_MESSAGE_SIZE) + 10;
+        long totalSize = (messageCount * MAX_MESSAGE_SIZE) >> 10;
+        sendMessages(messageCount, MAX_MESSAGE_SIZE);
+
+        // Check queueDepth. There should be no messages on the queue and as 
the subscriber is listening
+        // so there should be no Queue_Deoth alert raised
+        assertTrue(_queueMBean.getQueueDepth() == 0);
+        Notification lastNotification = _queueMBean.getLastNotification();
+        assertNull(lastNotification);
+
+        // Kill the subscriber and check for the queue depth values.
+        // Messages are unacknowledged, so those should get requeued. All 
messages should be on the Queue
+        _queue.unregisterProtocolSession(protocolSession, 
channel.getChannelId(), new AMQShortString("consumer_tag"));
+        channel.requeue();
+
+        assertTrue(_queueMBean.getQueueDepth() == totalSize);
+
+        lastNotification = _queueMBean.getLastNotification();
+        assertNotNull(lastNotification);
+        String notificationMsg = lastNotification.getMessage();
+        
assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+
+
+        // Connect a consumer again and check QueueDepth values. The queue 
should get emptied.
+        // Messages will get delivered but still are unacknowledged.
+        _queue.registerProtocolSession(protocolSession, channel.getChannelId(),
+                                       new AMQShortString("consumer_tag"), 
true, null, false, false);
+        _queue.deliverAsync();
+        while (_queue.getMessageCount() != 0)
+        {
+            Thread.sleep(100);
+        }
+        assertTrue(_queueMBean.getQueueDepth() == 0);
+
+        // Kill the subscriber again. Now those messages should get requeued 
again. Check if the queue depth
+        // value is correct.
+        _queue.unregisterProtocolSession(protocolSession, 
channel.getChannelId(), new AMQShortString("consumer_tag"));
+        channel.requeue();
+
+        assertTrue(_queueMBean.getQueueDepth() == totalSize);
+        protocolSession.closeSession();
+    }
     protected AMQMessage message(final boolean immediate, long size) throws 
AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
@@ -183,6 +251,7 @@
         contentHeaderBody.bodySize = size;   // in bytes
         AMQMessage message = new AMQMessage(_messageStore.getNewMessageId(), 
publish, _transactionalContext);
         message.setContentHeaderBody(contentHeaderBody);
+        message.setPublisher(protocolSession);
         return message;
     }
 
@@ -208,5 +277,14 @@
         {
             _queue.process(_storeContext, messages[i], false);
         }
+    }
+
+    private AMQQueue getNewQueue() throws AMQException
+    {
+        return new AMQQueue(new AMQShortString("testQueue" + Math.random()),
+                            false,
+                            new AMQShortString("AMQueueAlertTest"),
+                            false,
+                            _virtualHost);
     }
 }


Reply via email to