Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Jun 19 02:01:59 2008 @@ -23,24 +23,26 @@ import junit.framework.TestCase; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; +import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; -import java.util.HashSet; import java.util.LinkedList; import java.util.Set; +import java.util.Collections; /** * Tests that acknowledgements are handled correctly. @@ -49,7 +51,7 @@ { private static final Logger _log = Logger.getLogger(AckTest.class); - private SubscriptionImpl _subscription; + private Subscription _subscription; private MockProtocolSession _protocolSession; @@ -57,9 +59,7 @@ private StoreContext _storeContext = new StoreContext(); - private AMQChannel _channel; - - private SubscriptionSet _subscriptionManager; + private AMQChannel _channel; private AMQQueue _queue; @@ -75,11 +75,13 @@ super.setUp(); _messageStore = new TestableMemoryMessageStore(); _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession, 5, _messageStore); + _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), _subscriptionManager); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); + } private void publishMessages(int count) throws AMQException @@ -92,6 +94,7 @@ TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + _queue.registerSubscription(_subscription,false); MessageHandleFactory factory = new MessageHandleFactory(); for (int i = 1; i <= count; i++) { @@ -125,7 +128,8 @@ return new AMQShortString("rk"); } }; - AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext); + IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); + //IncomingMessage msg2 = null; if (persistent) { BasicContentHeaderProperties b = new BasicContentHeaderProperties(); @@ -142,10 +146,14 @@ // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.incrementReference(); - msg.routingComplete(_messageStore, _storeContext, factory); + msg.enqueue(Collections.singleton(_queue)); + msg.routingComplete(_messageStore, factory); + if(msg.allContentReceived()) + { + msg.deliverToQueues(); + } // we manually send the message to the subscription - _subscription.send(new QueueEntry(_queue,msg), _queue); + //_subscription.send(new QueueEntry(_queue,msg), _queue); } } @@ -155,16 +163,13 @@ */ public void testAckChannelAssociationTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX - // assertTrue(_messageStore.getNumberStoredMessages() == msgCount); Set<Long> deliveryTagSet = map.getDeliveryTags(); int i = 1; @@ -172,15 +177,12 @@ { assertTrue(deliveryTag == i); i++; - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); } assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - //DTX -// assertTrue(_messageStore.getNumberStoredMessages() == msgCount); } /** @@ -189,15 +191,32 @@ public void testNoAckMode() throws AMQException { // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, false); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 0); assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - //DTX MessageStore -// assertTrue(_messageStore.getNumberStoredMessages() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + + } + + /** + * Tests that in no-ack mode no messages are retained + */ + public void testPersistentNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); + final int msgCount = 10; + publishMessages(msgCount, true); + + UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); + assertTrue(_messageStore.getContentBodyMap().size() == 0); + } /** @@ -206,7 +225,7 @@ */ public void testSingleAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -219,7 +238,7 @@ for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); // 5 is the delivery tag of the message that *should* be removed if (++i == 5) @@ -235,7 +254,7 @@ */ public void testMultiAckReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -248,7 +267,7 @@ for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } @@ -259,7 +278,7 @@ */ public void testMultiAckAllReceivedTest() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); final int msgCount = 10; publishMessages(msgCount); @@ -272,18 +291,19 @@ for (long deliveryTag : deliveryTagSet) { assertTrue(deliveryTag == i + 5); - UnacknowledgedMessage unackedMsg = map.get(deliveryTag); + QueueEntry unackedMsg = map.get(deliveryTag); assertTrue(unackedMsg.getQueue() == _queue); ++i; } } +/* public void testPrefetchHighLow() throws AMQException { int lowMark = 5; int highMark = 10; - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); _channel.setPrefetchLowMarkCount(lowMark); _channel.setPrefetchHighMarkCount(highMark); @@ -332,10 +352,12 @@ assertTrue(map.size() == 0); } +*/ +/* public void testPrefetch() throws AMQException { - _subscription = new SubscriptionImpl(5, _protocolSession, DEFAULT_CONSUMER_TAG, true); - _channel.setPrefetchCount(5); + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); + _channel.setMessageCredit(5); assertTrue(_channel.getPrefetchCount() == 5); @@ -360,6 +382,7 @@ assertTrue(map.size() == 0); } +*/ public static junit.framework.Test suite() { return new junit.framework.TestSuite(AckTest.class);
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Thu Jun 19 02:01:59 2008 @@ -24,7 +24,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.NullApplicationRegistry; @@ -39,7 +39,7 @@ class MessageTestHelper extends TestCase { - private final MessageStore _messageStore = new MemoryMessageStore(); + private final MessageStore _messageStore = new SkeletonMessageStore(); private final StoreContext _storeContext = new StoreContext(); @@ -52,12 +52,12 @@ ApplicationRegistry.initialise(new NullApplicationRegistry()); } - QueueEntry message() throws AMQException + QueueEntryImpl message() throws AMQException { return message(false); } - QueueEntry message(final boolean immediate) throws AMQException + QueueEntryImpl message(final boolean immediate) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -87,9 +87,16 @@ return null; } }; - - return new QueueEntry(null,new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext, - new ContentHeaderBody())); + + //public AMQMessage(Long messageId, AMQMessageHandle messageHandle , TransactionalContext txnConext, MessagePublishInfo info) + long messageId = _messageStore.getNewMessageId(); + final AMQMessageHandle messageHandle = + (new MessageHandleFactory()).createMessageHandle(messageId, _messageStore, false); + messageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,new ContentHeaderBody()); + AMQMessage msg = new AMQMessage(messageHandle, _txnContext.getStoreContext(), publish); + + + return new QueueEntryImpl(null,msg, Long.MIN_VALUE); } } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java Thu Jun 19 02:01:59 2008 @@ -215,6 +215,11 @@ return null; //To change body of implemented methods use File | Settings | File Templates. } + public ProtocolSessionIdentifier getSessionIdentifier() + { + return null; + } + public byte getProtocolMajorVersion() { return getProtocolVersion().getMajorVersion(); Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=669431&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (added) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java Thu Jun 19 02:01:59 2008 @@ -0,0 +1,176 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import javax.jms.*; +import javax.naming.NamingException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.HashMap; +import java.util.Map; + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class PriorityTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "PriorityQueue"; + + + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + super.setUp(); + + if (usingInVMBroker()) + { + TransportConnection.createVMBroker(1); + } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:[EMAIL PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + Context context = factory.getInitialContext(env); + + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-priorities",10); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + + Queue queue = new AMQQueue("amq.direct",QUEUE); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + + + + + + + producerConnection.start(); + + + MessageProducer producer = producerSession.createProducer(queue); + + + + + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + + + + consumerConnection.start(); + + Message received; + //Receive Message 0 + StringBuilder buf = new StringBuilder(); + int receivedCount = 0; + Message previous = null; + int messageCount = 0; + while((received = consumer.receive(1000))!=null) + { + messageCount++; + if(previous != null) + { + assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + + producerSession.close(); + producer.close(); + + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + +} Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java Thu Jun 19 02:01:59 2008 @@ -81,14 +81,6 @@ System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG)); System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE)); - Logger csdm = Logger.getLogger(ConcurrentSelectorDeliveryManager.class); - System.err.println("csdm.isE-Error:" + csdm.isEnabledFor(Level.ERROR)); - System.err.println("csdm.isE-Warn:" + csdm.isEnabledFor(Level.WARN)); - System.err.println("csdm.isInfo:" + csdm.isInfoEnabled() + ":" + csdm.isEnabledFor(Level.INFO)); - System.err.println("csdm.isDebug:" + csdm.isDebugEnabled() + ":" + csdm.isEnabledFor(Level.DEBUG)); - System.err.println("csdm.isTrace:" + csdm.isTraceEnabled() + ":" + csdm.isEnabledFor(Level.TRACE)); - - System.err.println(Logger.getRootLogger().getLoggerRepository()); if (BROKER.startsWith("vm://")) @@ -184,9 +176,14 @@ try { + Thread.sleep(2000); long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); assertEquals("Session reports Queue depth not as expected", 0, queueDepth); } + catch (InterruptedException e) + { + fail(e.getMessage()); + } catch (NamingException e) { fail(e.getMessage()); @@ -209,7 +206,7 @@ } - private void verifyAllMessagesRecevied() throws JMSException + private void verifyAllMessagesRecevied() throws Exception { boolean[] msgIdRecevied = new boolean[MSG_COUNT]; @@ -219,6 +216,8 @@ _messages[i] = _consumer.receive(1000); assertNotNull("should have received a message but didn't", _messages[i]); } + long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _context.lookup("queue")); + assertEquals("Session reports Queue depth not as expected", 0, queueDepth); //Check received messages int msgId = 0; Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Jun 19 02:01:59 2008 @@ -21,6 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.framing.AMQShortString; import java.util.ArrayList; import java.util.List; @@ -54,7 +56,12 @@ return messages; } - public void send(QueueEntry msg, AMQQueue queue) + public void setQueue(AMQQueue queue) + { + + } + + public void send(QueueEntry msg) { messages.add(msg); } @@ -79,9 +86,39 @@ //no-op } - public Object getSendLock() + public void getSendLock() + { + return; + } + + public void releaseSendLock() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resend(final QueueEntry entry) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit(final QueueEntry queueEntry) + { + + } + + public void setStateListener(final StateListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public QueueEntry getLastSeenEntry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) { - return new Object(); + return false; //To change body of implemented methods use File | Settings | File Templates. } public AMQChannel getChannel() @@ -94,6 +131,26 @@ //no-op } + public AMQShortString getConsumerTag() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isActive() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQQueue getQueue() + { + return null; + } + + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void queueDeleted(AMQQueue queue) { } @@ -108,6 +165,11 @@ return true; } + public boolean isAutoClose() + { + return false; + } + public Queue<QueueEntry> getPreDeliveryQueue() { return null; @@ -157,5 +219,4 @@ { return key.toString(); } - } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Jun 19 02:01:59 2008 @@ -22,12 +22,11 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.Exchange; @@ -130,17 +129,17 @@ return null; } - public void removeQueue(AMQShortString name) throws AMQException + public void removeQueue(final AMQQueue queue) throws AMQException { } - public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } - public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException + public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { } Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Thu Jun 19 02:01:59 2008 @@ -28,6 +28,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.AMQMessageHandle; import org.apache.qpid.server.txn.NonTransactionalContext; /** @@ -39,6 +40,7 @@ private StoreContext _storeContext = new StoreContext(); + protected void setUp() throws Exception { super.setUp(); @@ -50,7 +52,7 @@ */ public void testMessageGetsRemoved() throws AMQException { - createPersistentContentHeader(); + ContentHeaderBody chb = createPersistentContentHeader(); MessagePublishInfo info = new MessagePublishInfo() { @@ -81,16 +83,22 @@ } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), info, - new NonTransactionalContext(_store, _storeContext, null, null), - createPersistentContentHeader()); + + final long messageId = _store.getNewMessageId(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext,info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 0); + assertEquals(1, _store.getMessageMetaDataMap().size()); } private ContentHeaderBody createPersistentContentHeader() @@ -134,18 +142,25 @@ } }; - AMQMessage message = new AMQMessage(_store.getNewMessageId(), - info, - new NonTransactionalContext(_store, _storeContext, null, null), - createPersistentContentHeader()); + final Long messageId = _store.getNewMessageId(); + final ContentHeaderBody chb = createPersistentContentHeader(); + AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); + messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); + AMQMessage message = new AMQMessage(messageHandle, + _storeContext, + info); + message = message.takeReference(); // we call routing complete to set up the handle - message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); + + + + assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); message.decrementReference(_storeContext); - assertTrue(_store.getMessageMetaDataMap().size() == 1); + assertEquals(1, _store.getMessageMetaDataMap().size()); } public static junit.framework.Test suite() Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java?rev=669431&r1=669430&r2=669431&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java Thu Jun 19 02:01:59 2008 @@ -110,13 +110,10 @@ { try { - long remainingMessages = ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue); - fail("The queue should have 0 msgs left, seen " + _msgCount + " messages, left: " - + remainingMessages); - } - catch (AMQException e) - { - fail("Got AMQException" + e.getMessage()); + if(_msgCount != MSG_COUNT) + { + assertEquals("Wrong number of messages seen.", MSG_COUNT, _msgCount); + } } finally { @@ -124,7 +121,6 @@ _awaitCompletion.countDown(); } } - } catch (JMSException e) { @@ -147,6 +143,11 @@ fail("Unable to wait for test completion"); throw e; } + + + // wait for the ack to get back + Thread.sleep(1000); + assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue)); } Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=669431&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java (added) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java Thu Jun 19 02:01:59 2008 @@ -0,0 +1,210 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.Enumeration; + +public class FlowControlTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(FlowControlTest.class); + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + + } + + /** + * Simply + */ + public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[128]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[256]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession).setPrefecthLimits(0,256); + MessageConsumer recv = consumerSession.createConsumer(_queue); + consumerConnection.start(); + + Message r1 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r1.acknowledge(); + + r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r2.acknowledge(); + + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + r3.acknowledge(); + recv.close(); + consumerSession.close(); + consumerConnection.close(); + + } + + public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[256]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[128]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession1).setPrefecthLimits(0,256); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + consumerConnection.start(); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + + Message r2 = recv1.receiveNoWait(); + assertNull("Second message incorrectly delivered", r2); + + Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession_0_8)consumerSession2).setPrefecthLimits(0,256); + MessageConsumer recv2 = consumerSession2.createConsumer(_queue); + + + r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv2.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + + + r2.acknowledge(); + r3.acknowledge(); + recv1.close(); + recv2.close(); + consumerSession1.close(); + consumerSession2.close(); + consumerConnection.close(); + + } + +}
