Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Jan 5 15:16:50 2007 @@ -27,6 +27,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; import javax.management.JMException; import java.util.LinkedList; @@ -41,9 +42,11 @@ private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; private MessageStore _messageStore = new SkeletonMessageStore(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null, + private StoreContext _storeContext = new StoreContext(); + private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + new HashSet<Long>()); private MockProtocolSession _protocolSession; private AMQChannel _channel; @@ -140,8 +143,8 @@ AMQMessage msg = message(false); long id = msg.getMessageId(); - _queue.clearQueue(); - _queue.process(msg); + _queue.clearQueue(_storeContext); + _queue.process(_storeContext, msg); _queueMBean.viewMessageContent(id); try { @@ -161,7 +164,7 @@ BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1000; // in bytes + contentHeaderBody.bodySize = 1000; // in bytes return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -184,7 +187,7 @@ } for (int i = 0; i < messageCount; i++) { - _queue.process(messages[i]); + _queue.process(_storeContext, messages[i]); } } }
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Jan 5 15:16:50 2007 @@ -32,6 +32,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.TestApplicationRegistry; @@ -53,6 +54,8 @@ private TestableMemoryMessageStore _messageStore; + private StoreContext _storeContext = new StoreContext(); + private AMQChannel _channel; private SubscriptionSet _subscriptionManager; @@ -82,7 +85,7 @@ private void publishMessages(int count, boolean persistent) throws AMQException { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null, + TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); MessageHandleFactory factory = new MessageHandleFactory(); @@ -111,7 +114,7 @@ // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription msg.incrementReference(); - msg.routingComplete(_messageStore, factory); + msg.routingComplete(_messageStore, _storeContext, factory); // we manually send the message to the subscription _subscription.send(msg, _queue); } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Fri Jan 5 15:16:50 2007 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(toString(), msg); + _deliveryMgr.deliver(null, toString(), msg); } } } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Fri Jan 5 15:16:50 2007 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import junit.framework.TestSuite; @@ -29,6 +30,7 @@ { protected final SubscriptionSet _subscriptions = new SubscriptionSet(); protected DeliveryManager _mgr; + protected StoreContext _storeContext = new StoreContext(); public DeliveryManagerTest() throws Exception { @@ -45,7 +47,7 @@ for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -55,7 +57,7 @@ for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } assertTrue(s1.getMessages().isEmpty()); @@ -93,7 +95,7 @@ for (int i = 0; i < batch; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } assertEquals(batch, s1.getMessages().size()); @@ -107,7 +109,7 @@ s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver("Me", messages[i]); + _mgr.deliver(_storeContext, "Me", messages[i]); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -129,7 +131,7 @@ try { AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, "Me", msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -151,7 +153,7 @@ _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver("Me", msg); + _mgr.deliver(_storeContext, "Me", msg); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java Fri Jan 5 15:16:50 2007 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.txn.TransactionalContext; @@ -40,7 +41,9 @@ { private final MessageStore _messageStore = new SkeletonMessageStore(); - private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null, + private final StoreContext _storeContext = new StoreContext(); + + private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>(), new HashSet<Long>()); @@ -61,7 +64,7 @@ BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody()); + new ContentHeaderBody()); } } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Jan 5 15:16:50 2007 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,9 +48,9 @@ public void close() throws Exception { - } + } - public void removeMessage(long messageId) + public void removeMessage(StoreContext s, long messageId) { } @@ -62,28 +62,28 @@ { } - public void enqueueMessage(String name, long messageId) throws AMQException + public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException { } - public void dequeueMessage(String name, long messageId) throws AMQException + public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException { } - public void beginTran() throws AMQException + public void beginTran(StoreContext s) throws AMQException { } - public boolean inTran() + public boolean inTran(StoreContext sc) { return false; } - - public void commitTran() throws AMQException + + public void commitTran(StoreContext storeContext) throws AMQException { } - public void abortTran() throws AMQException + public void abortTran(StoreContext storeContext) throws AMQException { } @@ -97,12 +97,12 @@ return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException { } - public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException { } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Jan 5 15:16:50 2007 @@ -22,9 +22,9 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -36,6 +36,8 @@ { private TestableMemoryMessageStore _store; + private StoreContext _storeContext = new StoreContext(); + protected void setUp() throws Exception { super.setUp(); @@ -48,14 +50,16 @@ public void testMessageGetsRemoved() throws AMQException { createPersistentContentHeader(); - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null, null), + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, + (byte)0), + new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertTrue(_store.getMessageMetaDataMap().size() == 1); - message.decrementReference(); + message.decrementReference(_storeContext); assertTrue(_store.getMessageMetaDataMap().size() == 0); } @@ -70,15 +74,17 @@ public void testMessageRemains() throws AMQException { - AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(), - new NonTransactionalContext(_store, null, null, null), + // TODO: fix hardcoded protocol version data + AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, + (byte)0), + new NonTransactionalContext(_store, _storeContext, null, null, null), createPersistentContentHeader()); message.incrementReference(); // we call routing complete to set up the handle - message.routingComplete(_store, new MessageHandleFactory()); + message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertTrue(_store.getMessageMetaDataMap().size() == 1); message.incrementReference(); - message.decrementReference(); + message.decrementReference(_storeContext); assertTrue(_store.getMessageMetaDataMap().size() == 1); } Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java?view=diff&rev=493231&r1=493230&r2=493231 ============================================================================== --- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java (original) +++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java Fri Jan 5 15:16:50 2007 @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import java.util.LinkedList; @@ -43,7 +44,7 @@ buffer.enlist(op); buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); @@ -58,7 +59,7 @@ buffer.enlist(new MockOp().expectRollback()); buffer.enlist(new MockOp().expectRollback()); - buffer.rollback(); + buffer.rollback(null); validateOps(); store.validate(); @@ -77,7 +78,7 @@ buffer.enlist(new FailedPrepare()); buffer.enlist(new MockOp()); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -95,7 +96,7 @@ buffer.enlist(new StoreMessageOperation(store)); buffer.enlist(new TxnTester(store)); - buffer.commit(); + buffer.commit(null); validateOps(); store.validate(); } @@ -127,12 +128,12 @@ ops.add(this); } - public void prepare() + public void prepare(StoreContext context) { assertEquals(expected.removeLast(), PREPARE); } - public void commit() + public void commit(StoreContext context) { assertEquals(expected.removeLast(), COMMIT); } @@ -142,7 +143,7 @@ assertEquals(expected.removeLast(), UNDO_PREPARE); } - public void rollback() + public void rollback(StoreContext context) { assertEquals(expected.removeLast(), ROLLBACK); } @@ -249,16 +250,16 @@ class NullOp implements TxnOp { - public void prepare() throws AMQException + public void prepare(StoreContext context) throws AMQException { } - public void commit() + public void commit(StoreContext context) { } public void undoPrepare() { } - public void rollback() + public void rollback(StoreContext context) { } } @@ -275,6 +276,8 @@ { private final MessageStore store; + private final StoreContext context = new StoreContext(); + TxnTester(MessageStore store) { this.store = store; @@ -282,12 +285,12 @@ public void prepare() throws AMQException { - assertTrue("Expected prepare to be performed under txn", store.inTran()); + assertTrue("Expected prepare to be performed under txn", store.inTran(context)); } public void commit() { - assertTrue("Expected commit not to be performed under txn", !store.inTran()); + assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); } }
