Author: rgreig
Date: Sun Nov  5 14:33:50 2006
New Revision: 471546

URL: http://svn.apache.org/viewvc?view=rev&rev=471546
Log:
QPID-32. Some more updates. Note that this is still a work in progress and the 
broker does not currently work.

Modified:
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
    
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
 Sun Nov  5 14:33:50 2006
@@ -90,7 +90,7 @@
 
         public boolean hasNext()
         {
-            return _index < _messageHandle.getBodyCount();
+            return _index < _messageHandle.getBodyCount() - 1;
         }
 
         public AMQDataBlock next()
@@ -111,7 +111,7 @@
 
         public boolean hasNext()
         {
-            return _index < _messageHandle.getBodyCount();
+            return _index < _messageHandle.getBodyCount() - 1;
         }
 
         public ContentBody next()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 Sun Nov  5 14:33:50 2006
@@ -683,7 +683,7 @@
             msg.checkDeliveredToConsumer();
             updateReceivedMessageCount(msg);
         }
-        catch(NoConsumersException e)
+        catch (NoConsumersException e)
         {
             // as this message will be returned, it should be removed
             // from the queue:

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Sun Nov  5 14:33:50 2006
@@ -15,6 +15,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 
 import java.util.List;
+import java.util.LinkedList;
 
 /**
  * @author Robert Greig ([EMAIL PROTECTED])
@@ -23,7 +24,7 @@
 {
     private ContentHeaderBody _contentHeaderBody;
 
-    private List<ContentBody> _contentBodies;
+    private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
 
     private boolean _redelivered;
 
@@ -52,12 +53,12 @@
 
     public long getBodySize()
     {
-        return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
+        return _contentHeaderBody.bodySize;
     }
 
     public ContentBody getContentBody(int index) throws 
IllegalArgumentException
     {
-        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+        return _contentBodies.get(index);
     }
 
     public void addContentBodyFrame(ContentBody contentBody) throws 
AMQException
@@ -67,7 +68,7 @@
 
     public String getExchangeName()
     {
-        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+        return null;
     }
 
     public String getRoutingKey()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
 Sun Nov  5 14:33:50 2006
@@ -34,12 +34,13 @@
      * @param base the base element identifier from which all configuration 
items are relative. For example, if the base
      * element is "store", the all elements used by concrete classes will be 
"store.foo" etc.
      * @param config the apache commons configuration object
+     * @throws Exception if an error occurs that means the store is unable to 
configure itself
      */
     void configure(QueueRegistry queueRegistry, String base, Configuration 
config) throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.
-     * @throws Exception
+     * @throws Exception if close fails
      */
     void close() throws Exception;
 

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
 Sun Nov  5 14:33:50 2006
@@ -91,12 +91,12 @@
     private class Scenario
     {
         private final LinkedHashMap<Long, UnacknowledgedMessage> _messages = 
new LinkedHashMap<Long, UnacknowledgedMessage>();
-        private final UnacknowledgedMessageMap _map = new 
UnacknowledgedMessageMapImpl(_messages, _messages);
+        private final UnacknowledgedMessageMap _map = new 
UnacknowledgedMessageMapImpl(500);
         private final TxAck _op = new TxAck(_map);
         private final List<Long> _acked;
         private final List<Long> _unacked;
 
-        Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+        Scenario(int messageCount, List<Long> acked, List<Long> unacked) 
throws AMQException
         {
             for(int i = 0; i < messageCount; i++)
             {
@@ -162,10 +162,11 @@
         private final long _tag;
         private int _count;
 
-        TestMessage(long tag)
+        TestMessage(long tag) throws AMQException
         {
-            super(new TestableMemoryMessageStore(), null);
+            super(null); // new TestableMemoryMessageStore(), null);
             _tag = tag;
+            throw new AMQException("Fix this!");
         }
 
         public void incrementReference()

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
 Sun Nov  5 14:33:50 2006
@@ -166,7 +166,9 @@
 
         private Message(BasicPublishBody publish, ContentHeaderBody header, 
List<ContentBody> bodies) throws AMQException
         {
-            super(_messageStore, publish, header, bodies);
+            //super(_messageStore, publish, header, bodies);
+            super(null);
+            throw new AMQException("Fix this!!!!");
         }
 
         private Message(AMQMessage msg) throws AMQException

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
 Sun Nov  5 14:33:50 2006
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -77,7 +78,7 @@
             BasicPublishBody publishBody = new BasicPublishBody();
             publishBody.routingKey = "rk";
             publishBody.exchange = "someExchange";
-            AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+            AMQMessage msg = null; //new AMQMessage(_messageStore, 
publishBody);             
             msg.setContentHeaderBody(new ContentHeaderBody());
             _subscription.send(msg, _queue);
         }
@@ -94,17 +95,25 @@
         final int msgCount = 10;
         publishMessages(msgCount);
 
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount);
-
-        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = 
map.entrySet().iterator();
-        for (int i = 1; i <= map.size(); i++)
+        
+        map.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
-            assertTrue(entry.getKey() == i);
-            UnacknowledgedMessage unackedMsg = entry.getValue();
-            assertTrue(unackedMsg.queue == _queue);
-        }
+            private int i = 1;
+
+            public boolean callback(UnacknowledgedMessage message) throws 
AMQException
+            {
+                assertTrue(message.deliveryTag == i++);
+                assertTrue(message.queue == _queue);
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });
+
         assertTrue(_messageStore.getMessageMap().size() == msgCount);
     }
 
@@ -119,7 +128,7 @@
         final int msgCount = 10;
         publishMessages(msgCount);
 
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
         assertTrue(_messageStore.getMessageMap().size() == 0);
     }
@@ -136,23 +145,29 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(5, false);
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount - 1);
 
-        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = 
map.entrySet().iterator();
-        int i = 1;
-        while (i <= map.size())
+        map.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
-            assertTrue(entry.getKey() == i);
-            UnacknowledgedMessage unackedMsg = entry.getValue();
-            assertTrue(unackedMsg.queue == _queue);
-            // 5 is the delivery tag of the message that *should* be removed
-            if (++i == 5)
+            private int i = 1;
+
+            public boolean callback(UnacknowledgedMessage message) throws 
AMQException
             {
-                ++i;
+                assertTrue(message.deliveryTag == i);
+                assertTrue(message.queue == _queue);
+                // 5 is the delivery tag of the message that *should* be 
removed
+                if (++i == 5)
+                {
+                    ++i;
+                }
+                return false;
             }
-        }
+
+            public void visitComplete()
+            {
+            }
+        });        
     }
 
     /**
@@ -167,19 +182,25 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(5, true);
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 5);
 
-        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = 
map.entrySet().iterator();
-        int i = 1;
-        while (i <= map.size())
+        map.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
-            assertTrue(entry.getKey() == i + 5);
-            UnacknowledgedMessage unackedMsg = entry.getValue();
-            assertTrue(unackedMsg.queue == _queue);
-            ++i;
-        }
+            private int i = 1;
+            
+            public boolean callback(UnacknowledgedMessage message) throws 
AMQException
+            {
+                assertTrue(message.deliveryTag == i + 5);
+                assertTrue(message.queue == _queue);
+                ++i;
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });        
     }
 
      /**
@@ -194,19 +215,25 @@
         publishMessages(msgCount);
 
         _channel.acknowledgeMessage(0, true);
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
 
-        Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = 
map.entrySet().iterator();
-        int i = 1;
-        while (i <= map.size())
+        map.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
-            assertTrue(entry.getKey() == i + 5);
-            UnacknowledgedMessage unackedMsg = entry.getValue();
-            assertTrue(unackedMsg.queue == _queue);
-            ++i;
-        }
+            private int i = 1;
+
+            public boolean callback(UnacknowledgedMessage message) throws 
AMQException
+            {
+                assertTrue(message.deliveryTag == i + 5);
+                assertTrue(message.queue == _queue);
+                ++i;
+                return false;
+            }
+
+            public void visitComplete()
+            {
+            }
+        });        
     }
 
 
@@ -222,7 +249,7 @@
         // at this point we should have sent out only 5 messages with a 
further 5 queued
         // up in the channel which should be suspended
         assertTrue(_subscription.isSuspended());
-        Map<Long, UnacknowledgedMessage> map = 
_channel.getUnacknowledgedMessageMap();
+        UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 5);
         _channel.acknowledgeMessage(5, true);
         assertTrue(!_subscription.isSuspended());

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
 Sun Nov  5 14:33:50 2006
@@ -43,7 +43,8 @@
     {
         BasicPublishBody publish = new BasicPublishBody();
         publish.immediate = immediate;
-        return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), 
null);
+        //return new AMQMessage(_messageStore, publish, new 
ContentHeaderBody(), null);
+        throw new AMQException("Need to fix this!!!");
     }
 
 }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
 Sun Nov  5 14:33:50 2006
@@ -103,7 +103,9 @@
         {
             for (AMQQueue q : queues)
             {
-                q.deliver(new AMQMessage(messageStore, i, publish, header, 
body));
+                //q.deliver(new AMQMessage(messageStore, i, publish, header, 
body));
+                //q.process(new AMQMessage(messageStore, i, publish, header, 
body));
+                throw new AMQException("Need to fix this!!");
             }
         }
     }

Modified: 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
--- 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
 (original)
+++ 
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
 Sun Nov  5 14:33:50 2006
@@ -22,6 +22,8 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.AMQException;
 
 /**
@@ -43,22 +45,25 @@
     @Test
     public void testMessageGetsRemoved() throws AMQException
     {
-        AMQMessage message = new AMQMessage(_store, null);
+        throw new AMQException("Fix this!!!");
+        /*AMQMessage message = new AMQMessage(_store, null);
         _store.put(message);
         Assert.assertTrue(_store.getMessageMap().size() == 1);
         message.decrementReference();
         Assert.assertTrue(_store.getMessageMap().size() == 0);
+        */
     }
 
     @Test
     public void testMessageRemains() throws AMQException
     {
-        AMQMessage message = new AMQMessage(_store, null);
+        throw new AMQException("Fix me!!!");
+        /*AMQMessage message = new AMQMessage(_store, null);
         _store.put(message);
         Assert.assertTrue(_store.getMessageMap().size() == 1);
         message.incrementReference();
         message.decrementReference();
-        Assert.assertTrue(_store.getMessageMap().size() == 1);
+        Assert.assertTrue(_store.getMessageMap().size() == 1);*/
     }
 
     public static junit.framework.Test suite()


Reply via email to