Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 18 15:00:40 2007 @@ -58,9 +58,9 @@ private final Object _sessionKey; - private MessageQueue<AMQMessage> _messages; + private MessageQueue<QueueEntry> _messages; - private Queue<AMQMessage> _resendQueue; + private Queue<QueueEntry> _resendQueue; private final boolean _noLocal; @@ -160,7 +160,7 @@ if (filtersMessages()) { - _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); + _messages = new ConcurrentLinkedMessageQueueAtomicSize<QueueEntry>(); } else { @@ -226,7 +226,7 @@ * * @throws AMQException */ - public void send(AMQMessage msg, AMQQueue queue) throws AMQException + public void send(QueueEntry msg, AMQQueue queue) throws AMQException { if (msg != null) { @@ -245,7 +245,7 @@ } } - private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException + private void sendToBrowser(QueueEntry msg, AMQQueue queue) throws AMQException { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -266,11 +266,11 @@ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(msg.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); } } - private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) + private void sendToConsumer(StoreContext storeContext, QueueEntry entry, AMQQueue queue) throws AMQException { try @@ -287,9 +287,9 @@ { if (_logger.isDebugEnabled()) { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + _logger.debug("No ack mode so dequeuing message immediately: " + entry.getMessage().getMessageId()); } - queue.dequeue(storeContext, msg); + queue.dequeue(storeContext, entry); } synchronized (channel) @@ -298,19 +298,19 @@ if (_sendLock.get()) { - _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); } if (_acks) { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + channel.addUnacknowledgedMessage(entry, deliveryTag, consumerTag); } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); if (!_acks) { - msg.decrementReference(storeContext); + entry.getMessage().decrementReference(storeContext); } } } @@ -320,7 +320,7 @@ // using a try->finally would set it even if an error occured. // Is this what we want? - msg.setDeliveredToConsumer(); + entry.setDeliveredToConsumer(); } } @@ -355,19 +355,19 @@ return _filters != null || _noLocal; } - public boolean hasInterest(AMQMessage msg) + public boolean hasInterest(QueueEntry entry) { //check that the message hasn't been rejected - if (msg.isRejectedBy(this)) + if (entry.isRejectedBy(this)) { if (_logger.isDebugEnabled()) { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity()); + _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); } // return false; } - final AMQProtocolSession publisher = msg.getPublisher(); + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); //todo - client id should be recoreded and this test removed but handled below if (_noLocal && publisher != null) @@ -418,9 +418,9 @@ if (_logger.isTraceEnabled()) { - _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity()); + _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); } - return checkFilters(msg); + return checkFilters(entry); } @@ -431,7 +431,7 @@ return id; } - private boolean checkFilters(AMQMessage msg) + private boolean checkFilters(QueueEntry msg) { if (_filters != null) { @@ -439,7 +439,7 @@ // { // _logger.trace("(" + debugIdentity() + ") has filters."); // } - return _filters.allAllow(msg); + return _filters.allAllow(msg.getMessage()); } else { @@ -452,12 +452,12 @@ } } - public Queue<AMQMessage> getPreDeliveryQueue() + public Queue<QueueEntry> getPreDeliveryQueue() { return _messages; } - public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) + public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst) { if (_messages != null) { @@ -561,19 +561,19 @@ while (!_resendQueue.isEmpty()) { - AMQMessage resent = _resendQueue.poll(); + QueueEntry resent = _resendQueue.poll(); if (_logger.isTraceEnabled()) { _logger.trace("Removed for resending:" + resent.debugIdentity()); } - resent.release(_queue); + resent.release(); _queue.subscriberHasPendingResend(false, this, resent); try { - channel.getTransactionalContext().deliver(resent, _queue, true); + channel.getTransactionalContext().deliver(resent, true); } catch (AMQException e) { @@ -611,22 +611,22 @@ return _isBrowser; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg); + return channel.wouldSuspend(msg.getMessage()); } - public Queue<AMQMessage> getResendQueue() + public Queue<QueueEntry> getResendQueue() { if (_resendQueue == null) { - _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + _resendQueue = new ConcurrentLinkedQueueAtomicSize<QueueEntry>(); } return _resendQueue; } - public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages) { if (_resendQueue != null && !_resendQueue.isEmpty()) { @@ -651,7 +651,7 @@ } } - public void addToResendQueue(AMQMessage msg) + public void addToResendQueue(QueueEntry msg) { // add to our resend queue getResendQueue().add(msg);
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 18 15:00:40 2007 @@ -30,5 +30,5 @@ { public List<Subscription> getSubscriptions(); public boolean hasActiveSubscribers(); - public Subscription nextSubscriber(AMQMessage msg); + public Subscription nextSubscriber(QueueEntry entry); } Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 18 15:00:40 2007 @@ -113,7 +113,7 @@ * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning. */ - public Subscription nextSubscriber(AMQMessage msg) + public Subscription nextSubscriber(QueueEntry msg) { if (_subscriptions.isEmpty()) { @@ -140,7 +140,7 @@ } } - private Subscription nextSubscriberImpl(AMQMessage msg) + private Subscription nextSubscriberImpl(QueueEntry msg) { final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber); while (iterator.hasNext()) Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Tue Dec 18 15:00:40 2007 @@ -30,6 +30,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; @@ -64,14 +65,13 @@ private static class DeliveryDetails { - public AMQMessage message; - public AMQQueue queue; + public QueueEntry entry; + private boolean deliverFirst; - public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst) + public DeliveryDetails(QueueEntry entry, boolean deliverFirst) { - this.message = message; - this.queue = queue; + this.entry = entry; this.deliverFirst = deliverFirst; } } @@ -103,7 +103,7 @@ _postCommitDeliveryList.clear(); } - public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException + public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException { // A publication will result in the enlisting of several // TxnOps. The first is an op that will store the message. @@ -112,9 +112,9 @@ // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. // message.incrementReference(); - _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); + _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst)); _messageDelivered = true; - _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); + _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages)); /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); if (_log.isDebugEnabled()) { @@ -242,11 +242,11 @@ { for (DeliveryDetails dd : _postCommitDeliveryList) { - dd.queue.process(_storeContext, dd.message, dd.deliverFirst); + dd.entry.process(_storeContext, dd.deliverFirst); try { - dd.message.checkDeliveredToConsumer(); + dd.entry.checkDeliveredToConsumer(); } catch (NoConsumersException nce) { Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Dec 18 15:00:40 2007 @@ -34,6 +34,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; @@ -92,14 +93,14 @@ // Does not apply to this context } - public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException + public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException { try { - queue.process(_storeContext, message, deliverFirst); + entry.process(_storeContext, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: - message.checkDeliveredToConsumer(); + entry.checkDeliveredToConsumer(); } catch (NoConsumersException e) { @@ -128,7 +129,7 @@ { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + message.message.getMessageId()); + _log.debug("Discarding message: " + message.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -162,7 +163,7 @@ { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + msg.message.getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -192,7 +193,7 @@ { if (_log.isDebugEnabled()) { - _log.debug("Discarding message: " + msg.message.getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageId()); } //Message has been ack so discard it. This will dequeue and decrement the reference. @@ -206,7 +207,7 @@ if (_log.isDebugEnabled()) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.message.getMessageId()); + msg.getMessage().getMessageId()); } } } Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Tue Dec 18 15:00:40 2007 @@ -25,6 +25,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; /** @@ -111,14 +112,13 @@ * * <p/>This is an 'enqueue' operation. * - * @param message The message to deliver. - * @param queue The queue to deliver the message to. + * @param entry The message to deliver, and the queue to deliver to. * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt> * for normal FIFO message ordering. * * @throws AMQException If the message cannot be delivered for any reason. */ - void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException; + void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException; /** * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Tue Dec 18 15:00:40 2007 @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -85,7 +86,7 @@ } - protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + protected List<List> createMessageData(java.util.List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, boolean showMessageHeaders) { @@ -96,8 +97,9 @@ display.add(hex); display.add(ascii); - for (AMQMessage msg : messages) + for (QueueEntry entry : messages) { + AMQMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; @@ -252,8 +254,8 @@ private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg, String title, boolean routing, boolean headers, boolean messageHeaders) { - List<AMQMessage> single = new LinkedList<AMQMessage>(); - single.add(msg); + List<QueueEntry> single = new LinkedList<QueueEntry>(); + single.add(new QueueEntry(null,msg)); List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Tue Dec 18 15:00:40 2007 @@ -23,6 +23,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.tools.messagestore.MessageStoreTool; @@ -166,12 +167,12 @@ if (fromQueue != null) { - List<AMQMessage> messages = fromQueue.getMessagesOnTheQueue(); + List<QueueEntry> messages = fromQueue.getMessagesOnTheQueue(); if (messages != null) { - for (AMQMessage msg : messages) + for (QueueEntry msg : messages) { - ids.add(msg.getMessageId()); + ids.add(msg.getMessage().getMessageId()); } } } Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Tue Dec 18 15:00:40 2007 @@ -27,6 +27,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -114,7 +115,7 @@ if (_queue != null) { - List<AMQMessage> messages = _queue.getMessagesOnTheQueue(); + List<QueueEntry> messages = _queue.getMessagesOnTheQueue(); if (messages == null || messages.size() == 0) { _console.println("No messages on queue"); @@ -153,7 +154,7 @@ * @param showMessageHeaders show the msg headers be shown * @return the formated data lists for printing */ - protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + protected List<List> createMessageData(List<Long> msgids, List<QueueEntry> messages, boolean showHeaders, boolean showRouting, boolean showMessageHeaders) { @@ -334,8 +335,9 @@ } //Add create the table of data - for (AMQMessage msg : messages) + for (QueueEntry entry : messages) { + AMQMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Tue Dec 18 15:00:40 2007 @@ -104,7 +104,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -146,7 +146,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -166,7 +166,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -207,7 +207,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -227,7 +227,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -247,7 +247,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -266,7 +266,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -308,7 +308,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -327,7 +327,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -369,7 +369,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -403,7 +403,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -446,7 +446,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -487,7 +487,7 @@ Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0).getMessage()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Dec 18 15:00:40 2007 @@ -286,7 +286,7 @@ for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i], false); + _queue.process(_storeContext, new QueueEntry(_queue,messages[i]), false); } } Modified: incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original) +++ incubator/qpid/branches/M2.1/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Dec 18 15:00:40 2007 @@ -211,7 +211,7 @@ msg.enqueue(_queue); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - _queue.process(_storeContext, msg, false); + _queue.process(_storeContext, new QueueEntry(_queue, msg), false); _queueMBean.viewMessageContent(id); try { Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Dec 18 15:00:40 2007 @@ -29,6 +29,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; @@ -133,7 +134,7 @@ }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag)); + _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); } _acked = acked; _unacked = unacked; @@ -150,7 +151,7 @@ { UnacknowledgedMessage u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); - ((TestMessage) u.message).assertCountEquals(expected); + ((TestMessage) u.getMessage()).assertCountEquals(expected); } } Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Dec 18 15:00:40 2007 @@ -27,6 +27,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -250,9 +251,9 @@ * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException + public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException { - messages.add(new HeadersExchangeTest.Message(msg)); + messages.add(new HeadersExchangeTest.Message(msg.getMessage())); } } Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Dec 18 15:00:40 2007 @@ -142,7 +142,7 @@ msg.incrementReference(); msg.routingComplete(_messageStore, _storeContext, factory); // we manually send the message to the subscription - _subscription.send(msg, _queue); + _subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -167,7 +167,7 @@ assertTrue(deliveryTag == i); i++; UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); } assertTrue(map.size() == msgCount); @@ -228,7 +228,7 @@ { assertTrue(deliveryTag == i); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) { @@ -257,7 +257,7 @@ { assertTrue(deliveryTag == i + 5); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); ++i; } } @@ -281,7 +281,7 @@ { assertTrue(deliveryTag == i + 5); UnacknowledgedMessage unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.queue == _queue); + assertTrue(unackedMsg.getQueue() == _queue); ++i; } } Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java Tue Dec 18 15:00:40 2007 @@ -42,9 +42,9 @@ private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>(); private final Set<Subscription> _active = new HashSet<Subscription>(); - private final List<AMQMessage> _messages = new ArrayList<AMQMessage>(); + private final List<QueueEntry> _messages = new ArrayList<QueueEntry>(); private int next = 0;//index to next message to send - private final List<AMQMessage> _received = Collections.synchronizedList(new ArrayList<AMQMessage>()); + private final List<QueueEntry> _received = Collections.synchronizedList(new ArrayList<QueueEntry>()); private final Executor _executor = new OnCurrentThreadExecutor(); private final List<Thread> _threads = new ArrayList<Thread>(); @@ -159,7 +159,7 @@ } } - private AMQMessage nextMessage() + private QueueEntry nextMessage() { synchronized (_messages) { @@ -191,7 +191,7 @@ { void doRun() throws Throwable { - AMQMessage msg = nextMessage(); + QueueEntry msg = nextMessage(); if (msg != null) { _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Dec 18 15:00:40 2007 @@ -40,7 +40,7 @@ public void testStartInQueueingMode() throws AMQException { - AMQMessage[] messages = new AMQMessage[10]; + QueueEntry[] messages = new QueueEntry[10]; for (int i = 0; i < messages.length; i++) { messages[i] = message(); @@ -85,7 +85,7 @@ public void testStartInDirectMode() throws AMQException { - AMQMessage[] messages = new AMQMessage[10]; + QueueEntry[] messages = new QueueEntry[10]; for (int i = 0; i < messages.length; i++) { messages[i] = message(); @@ -132,7 +132,7 @@ { try { - AMQMessage msg = message(true); + QueueEntry msg = message(true); _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); @@ -154,7 +154,7 @@ SubscriptionTestHelper s = new SubscriptionTestHelper("A"); _subscriptions.addSubscriber(s); s.setSuspended(true); - AMQMessage msg = message(true); + QueueEntry msg = message(true); _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Dec 18 15:00:40 2007 @@ -55,12 +55,12 @@ ApplicationRegistry.initialise(new NullApplicationRegistry()); } - AMQMessage message() throws AMQException + QueueEntry message() throws AMQException { return message(false); } - AMQMessage message(final boolean immediate) throws AMQException + QueueEntry message(final boolean immediate) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -86,8 +86,8 @@ } }; - return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody()); + return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, + new ContentHeaderBody())); } } Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=605352&r1=605351&r2=605352&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original) +++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Dec 18 15:00:40 2007 @@ -28,13 +28,13 @@ public class SubscriptionTestHelper implements Subscription { - private final List<AMQMessage> messages; + private final List<QueueEntry> messages; private final Object key; private boolean isSuspended; public SubscriptionTestHelper(Object key) { - this(key, new ArrayList<AMQMessage>()); + this(key, new ArrayList<QueueEntry>()); } public SubscriptionTestHelper(final Object key, final boolean isSuspended) @@ -43,18 +43,18 @@ setSuspended(isSuspended); } - SubscriptionTestHelper(Object key, List<AMQMessage> messages) + SubscriptionTestHelper(Object key, List<QueueEntry> messages) { this.key = key; this.messages = messages; } - List<AMQMessage> getMessages() + List<QueueEntry> getMessages() { return messages; } - public void send(AMQMessage msg, AMQQueue queue) + public void send(QueueEntry msg, AMQQueue queue) { messages.add(msg); } @@ -69,12 +69,12 @@ return isSuspended; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(QueueEntry msg) { return isSuspended; } - public void addToResendQueue(AMQMessage msg) + public void addToResendQueue(QueueEntry msg) { //no-op } @@ -98,27 +98,27 @@ return false; } - public boolean hasInterest(AMQMessage msg) + public boolean hasInterest(QueueEntry msg) { return true; } - public Queue<AMQMessage> getPreDeliveryQueue() + public Queue<QueueEntry> getPreDeliveryQueue() { return null; } - public Queue<AMQMessage> getResendQueue() + public Queue<QueueEntry> getResendQueue() { return null; } - public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages) { return messages; } - public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) + public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst) { //no-op }
