Author: aidan
Date: Fri Aug  8 03:31:26 2008
New Revision: 683932

URL: http://svn.apache.org/viewvc?rev=683932&view=rev
Log:
QPID-1224: add methods to get the list of message ids from a queue, with 
optional offset. Test class for this.

Added:
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=683932&r1=683931&r2=683932&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Fri Aug  8 03:31:26 2008
@@ -104,6 +104,10 @@
 
     List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long 
toMessageId);
 
+    List<Long> getMessagesOnTheQueue(int num);
+
+    List<Long> getMessagesOnTheQueue(int num, int offest);
+
     QueueEntry getMessageOnTheQueue(long messageId);
 
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=683932&r1=683931&r2=683932&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Fri Aug  8 03:31:26 2008
@@ -1613,4 +1613,26 @@
             deliverAsync(_sub);
         }
     }
+
+    public List<Long> getMessagesOnTheQueue(int num)
+    {
+        return getMessagesOnTheQueue(num, 0);
+    }
+
+    public List<Long> getMessagesOnTheQueue(int num, int offset)
+    {
+        ArrayList<Long> ids = new ArrayList<Long>(num);
+        QueueEntryIterator it = _entries.iterator();
+        for (int i = 0; i < offset; i++)
+        {
+            it.advance();
+        }
+
+        for (int i = 0; i < num && !it.atTail(); i++)
+        {
+            it.advance();
+            ids.add(it.getNode().getMessage().getMessageId());
+        }
+        return ids;
+    }
 }
\ No newline at end of file

Added: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=683932&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
 Fri Aug  8 03:31:26 2008
@@ -0,0 +1,178 @@
+package org.apache.qpid.server.queue;
+
+import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import junit.framework.TestCase;
+
+public class SimpleAMQQueueTest extends TestCase
+{
+
+    private SimpleAMQQueue _queue;
+    private MessageStore store = new TestableMemoryMessageStore();
+    private TransactionalContext ctx = new NonTransactionalContext(store, new 
StoreContext(), null, null);
+    private MessageHandleFactory factory = new MessageHandleFactory();
+
+    MessagePublishInfo info = new MessagePublishInfo()
+    {
+
+        public AMQShortString getExchange()
+        {
+            return null;
+        }
+
+        public void setExchange(AMQShortString exchange)
+        {
+            //To change body of implemented methods use File | Settings | File 
Templates.
+        }
+
+        public boolean isImmediate()
+        {
+            return false;
+        }
+
+        public boolean isMandatory()
+        {
+            return false;
+        }
+
+        public AMQShortString getRoutingKey()
+        {
+            return null;
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        AMQShortString qname = new AMQShortString("qname");
+        AMQShortString owner = new AMQShortString("owner");
+        _queue = new SimpleAMQQueue(qname, false, owner, false, new 
VirtualHost("vhost", store));
+    }
+
+    public void testGetFirstMessageId() throws Exception
+    {
+        // Create message
+        Long messageId = new Long(23);
+        AMQMessage message = new TestMessage(messageId, messageId, info, new 
StoreContext());
+
+        // Put message on queue
+        _queue.enqueue(null, message);
+        // Get message id
+        Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
+
+        // Check message id
+        assertEquals("Message ID was wrong", messageId, testmsgid);
+    }
+
+    public void testGetFirstFiveMessageIds() throws Exception
+    {
+        for (int i = 0 ; i < 5; i++)
+        {
+            // Create message
+            Long messageId = new Long(i);
+            AMQMessage message = new TestMessage(messageId, messageId, info, 
new StoreContext());
+            // Put message on queue
+            _queue.enqueue(null, message);
+        }
+        // Get message ids
+        List<Long> msgids = _queue.getMessagesOnTheQueue(5);
+
+        // Check message id
+        for (int i = 0; i < 5; i++)
+        {
+            Long messageId = new Long(i);
+            assertEquals("Message ID was wrong", messageId, msgids.get(i));
+        }
+    }
+
+    public void testGetLastFiveMessageIds() throws Exception
+    {
+        for (int i = 0 ; i < 10; i++)
+        {
+            // Create message
+            Long messageId = new Long(i);
+            AMQMessage message = new TestMessage(messageId, messageId, info, 
new StoreContext());
+            // Put message on queue
+            _queue.enqueue(null, message);
+        }
+        // Get message ids
+        List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
+
+        // Check message id
+        for (int i = 0; i < 5; i++)
+        {
+            Long messageId = new Long(i+5);
+            assertEquals("Message ID was wrong", messageId, msgids.get(i));
+        }
+    }
+
+
+    // FIXME: move this to somewhere useful
+    private static AMQMessageHandle createMessageHandle(final long messageId, 
final MessagePublishInfo publishBody)
+    {
+        final AMQMessageHandle amqMessageHandle = (new 
MessageHandleFactory()).createMessageHandle(messageId,
+                                                                               
                    null,
+                                                                               
                    false);
+        try
+        {
+            amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),
+                                                              publishBody,
+                                                              new 
ContentHeaderBody()
+            {
+                public int getSize()
+                {
+                    return 1;
+                }
+            });
+        }
+        catch (AMQException e)
+        {
+            // won't happen
+        }
+
+
+        return amqMessageHandle;
+    }
+
+    public class TestMessage extends AMQMessage
+    {
+        private final long _tag;
+        private int _count;
+
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, 
StoreContext storeContext)
+                throws AMQException
+        {
+            super(createMessageHandle(messageId, publishBody), storeContext, 
publishBody);
+            _tag = tag;
+        }
+
+
+        public boolean incrementReference()
+        {
+            _count++;
+            return true;
+        }
+
+        public void decrementReference(StoreContext context)
+        {
+            _count--;
+        }
+
+        void assertCountEquals(int expected)
+        {
+            assertEquals("Wrong count for message with tag " + _tag, expected, 
_count);
+        }
+    }
+}


Reply via email to