Author: rgreig
Date: Sun Nov 5 14:33:50 2006
New Revision: 471546
URL: http://svn.apache.org/viewvc?view=rev&rev=471546
Log:
QPID-32. Some more updates. Note that this is still a work in progress and the
broker does not currently work.
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQMessage.java
Sun Nov 5 14:33:50 2006
@@ -90,7 +90,7 @@
public boolean hasNext()
{
- return _index < _messageHandle.getBodyCount();
+ return _index < _messageHandle.getBodyCount() - 1;
}
public AMQDataBlock next()
@@ -111,7 +111,7 @@
public boolean hasNext()
{
- return _index < _messageHandle.getBodyCount();
+ return _index < _messageHandle.getBodyCount() - 1;
}
public ContentBody next()
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
Sun Nov 5 14:33:50 2006
@@ -683,7 +683,7 @@
msg.checkDeliveredToConsumer();
updateReceivedMessageCount(msg);
}
- catch(NoConsumersException e)
+ catch (NoConsumersException e)
{
// as this message will be returned, it should be removed
// from the queue:
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Sun Nov 5 14:33:50 2006
@@ -15,6 +15,7 @@
import org.apache.qpid.framing.ContentHeaderBody;
import java.util.List;
+import java.util.LinkedList;
/**
* @author Robert Greig ([EMAIL PROTECTED])
@@ -23,7 +24,7 @@
{
private ContentHeaderBody _contentHeaderBody;
- private List<ContentBody> _contentBodies;
+ private List<ContentBody> _contentBodies = new LinkedList<ContentBody>();
private boolean _redelivered;
@@ -52,12 +53,12 @@
public long getBodySize()
{
- return 0; //To change body of implemented methods use File | Settings
| File Templates.
+ return _contentHeaderBody.bodySize;
}
public ContentBody getContentBody(int index) throws
IllegalArgumentException
{
- return null; //To change body of implemented methods use File |
Settings | File Templates.
+ return _contentBodies.get(index);
}
public void addContentBodyFrame(ContentBody contentBody) throws
AMQException
@@ -67,7 +68,7 @@
public String getExchangeName()
{
- return null; //To change body of implemented methods use File |
Settings | File Templates.
+ return null;
}
public String getRoutingKey()
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/store/MessageStore.java
Sun Nov 5 14:33:50 2006
@@ -34,12 +34,13 @@
* @param base the base element identifier from which all configuration
items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be
"store.foo" etc.
* @param config the apache commons configuration object
+ * @throws Exception if an error occurs that means the store is unable to
configure itself
*/
void configure(QueueRegistry queueRegistry, String base, Configuration
config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
- * @throws Exception
+ * @throws Exception if close fails
*/
void close() throws Exception;
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/ack/TxAckTest.java
Sun Nov 5 14:33:50 2006
@@ -91,12 +91,12 @@
private class Scenario
{
private final LinkedHashMap<Long, UnacknowledgedMessage> _messages =
new LinkedHashMap<Long, UnacknowledgedMessage>();
- private final UnacknowledgedMessageMap _map = new
UnacknowledgedMessageMapImpl(_messages, _messages);
+ private final UnacknowledgedMessageMap _map = new
UnacknowledgedMessageMapImpl(500);
private final TxAck _op = new TxAck(_map);
private final List<Long> _acked;
private final List<Long> _unacked;
- Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
throws AMQException
{
for(int i = 0; i < messageCount; i++)
{
@@ -162,10 +162,11 @@
private final long _tag;
private int _count;
- TestMessage(long tag)
+ TestMessage(long tag) throws AMQException
{
- super(new TestableMemoryMessageStore(), null);
+ super(null); // new TestableMemoryMessageStore(), null);
_tag = tag;
+ throw new AMQException("Fix this!");
}
public void incrementReference()
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java
Sun Nov 5 14:33:50 2006
@@ -166,7 +166,9 @@
private Message(BasicPublishBody publish, ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(_messageStore, publish, header, bodies);
+ //super(_messageStore, publish, header, bodies);
+ super(null);
+ throw new AMQException("Fix this!!!!");
}
private Message(AMQMessage msg) throws AMQException
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
Sun Nov 5 14:33:50 2006
@@ -27,6 +27,7 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -77,7 +78,7 @@
BasicPublishBody publishBody = new BasicPublishBody();
publishBody.routingKey = "rk";
publishBody.exchange = "someExchange";
- AMQMessage msg = new AMQMessage(_messageStore, publishBody);
+ AMQMessage msg = null; //new AMQMessage(_messageStore,
publishBody);
msg.setContentHeaderBody(new ContentHeaderBody());
_subscription.send(msg, _queue);
}
@@ -94,17 +95,25 @@
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
-
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it =
map.entrySet().iterator();
- for (int i = 1; i <= map.size(); i++)
+
+ map.visit(new UnacknowledgedMessageMap.Visitor()
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- }
+ private int i = 1;
+
+ public boolean callback(UnacknowledgedMessage message) throws
AMQException
+ {
+ assertTrue(message.deliveryTag == i++);
+ assertTrue(message.queue == _queue);
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+
assertTrue(_messageStore.getMessageMap().size() == msgCount);
}
@@ -119,7 +128,7 @@
final int msgCount = 10;
publishMessages(msgCount);
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
assertTrue(_messageStore.getMessageMap().size() == 0);
}
@@ -136,23 +145,29 @@
publishMessages(msgCount);
_channel.acknowledgeMessage(5, false);
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount - 1);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it =
map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
+ map.visit(new UnacknowledgedMessageMap.Visitor()
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- // 5 is the delivery tag of the message that *should* be removed
- if (++i == 5)
+ private int i = 1;
+
+ public boolean callback(UnacknowledgedMessage message) throws
AMQException
{
- ++i;
+ assertTrue(message.deliveryTag == i);
+ assertTrue(message.queue == _queue);
+ // 5 is the delivery tag of the message that *should* be
removed
+ if (++i == 5)
+ {
+ ++i;
+ }
+ return false;
}
- }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -167,19 +182,25 @@
publishMessages(msgCount);
_channel.acknowledgeMessage(5, true);
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it =
map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
+ map.visit(new UnacknowledgedMessageMap.Visitor()
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
+ private int i = 1;
+
+ public boolean callback(UnacknowledgedMessage message) throws
AMQException
+ {
+ assertTrue(message.deliveryTag == i + 5);
+ assertTrue(message.queue == _queue);
+ ++i;
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
}
/**
@@ -194,19 +215,25 @@
publishMessages(msgCount);
_channel.acknowledgeMessage(0, true);
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- Iterator<Map.Entry<Long, UnacknowledgedMessage>> it =
map.entrySet().iterator();
- int i = 1;
- while (i <= map.size())
+ map.visit(new UnacknowledgedMessageMap.Visitor()
{
- Map.Entry<Long, UnacknowledgedMessage> entry = it.next();
- assertTrue(entry.getKey() == i + 5);
- UnacknowledgedMessage unackedMsg = entry.getValue();
- assertTrue(unackedMsg.queue == _queue);
- ++i;
- }
+ private int i = 1;
+
+ public boolean callback(UnacknowledgedMessage message) throws
AMQException
+ {
+ assertTrue(message.deliveryTag == i + 5);
+ assertTrue(message.queue == _queue);
+ ++i;
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
}
@@ -222,7 +249,7 @@
// at this point we should have sent out only 5 messages with a
further 5 queued
// up in the channel which should be suspended
assertTrue(_subscription.isSuspended());
- Map<Long, UnacknowledgedMessage> map =
_channel.getUnacknowledgedMessageMap();
+ UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
_channel.acknowledgeMessage(5, true);
assertTrue(!_subscription.isSuspended());
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/MessageTestHelper.java
Sun Nov 5 14:33:50 2006
@@ -43,7 +43,8 @@
{
BasicPublishBody publish = new BasicPublishBody();
publish.immediate = immediate;
- return new AMQMessage(_messageStore, publish, new ContentHeaderBody(),
null);
+ //return new AMQMessage(_messageStore, publish, new
ContentHeaderBody(), null);
+ throw new AMQException("Need to fix this!!!");
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/SendPerfTest.java
Sun Nov 5 14:33:50 2006
@@ -103,7 +103,9 @@
{
for (AMQQueue q : queues)
{
- q.deliver(new AMQMessage(messageStore, i, publish, header,
body));
+ //q.deliver(new AMQMessage(messageStore, i, publish, header,
body));
+ //q.process(new AMQMessage(messageStore, i, publish, header,
body));
+ throw new AMQException("Need to fix this!!");
}
}
}
Modified:
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=471546&r1=471545&r2=471546
==============================================================================
---
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
(original)
+++
incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/store/TestReferenceCounting.java
Sun Nov 5 14:33:50 2006
@@ -22,6 +22,8 @@
import org.junit.Assert;
import org.junit.Before;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.AMQException;
/**
@@ -43,22 +45,25 @@
@Test
public void testMessageGetsRemoved() throws AMQException
{
- AMQMessage message = new AMQMessage(_store, null);
+ throw new AMQException("Fix this!!!");
+ /*AMQMessage message = new AMQMessage(_store, null);
_store.put(message);
Assert.assertTrue(_store.getMessageMap().size() == 1);
message.decrementReference();
Assert.assertTrue(_store.getMessageMap().size() == 0);
+ */
}
@Test
public void testMessageRemains() throws AMQException
{
- AMQMessage message = new AMQMessage(_store, null);
+ throw new AMQException("Fix me!!!");
+ /*AMQMessage message = new AMQMessage(_store, null);
_store.put(message);
Assert.assertTrue(_store.getMessageMap().size() == 1);
message.incrementReference();
message.decrementReference();
- Assert.assertTrue(_store.getMessageMap().size() == 1);
+ Assert.assertTrue(_store.getMessageMap().size() == 1);*/
}
public static junit.framework.Test suite()