Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original) +++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Sep 28 03:41:49 2007 @@ -149,19 +149,21 @@ { int port = details.getPort(); - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - if (AutoCreate) + if (!_inVmPipeAddress.containsKey(port)) { - createVMBroker(port); - } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + if (AutoCreate) + { + createVMBroker(port); + } + else + { + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); + } } } - return new VmPipeTransportConnection(port); } @@ -176,69 +178,71 @@ config.setThreadModel(ReadWriteThreadModel.getInstance()); } - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - _logger.info("Creating InVM Qpid.AMQP listening on port " + port); - IoHandlerAdapter provider = null; - try + if (!_inVmPipeAddress.containsKey(port)) { - VmPipeAddress pipe = new VmPipeAddress(port); - - provider = createBrokerInstance(port); - - _acceptor.bind(pipe, provider); - - _inVmPipeAddress.put(port, pipe); - _logger.info("Created InVM Qpid.AMQP listening on port " + port); - } - catch (IOException e) - { - _logger.error("Got IOException.", e); - - // Try and unbind provider + _logger.info("Creating InVM Qpid.AMQP listening on port " + port); + IoHandlerAdapter provider = null; try { VmPipeAddress pipe = new VmPipeAddress(port); - try - { - _acceptor.unbind(pipe); - } - catch (Exception ignore) - { - // ignore - } - - if (provider == null) - { - provider = createBrokerInstance(port); - } + provider = createBrokerInstance(port); _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - catch (IOException justUseFirstException) + catch (IOException e) { - String because; - if (e.getCause() == null) + _logger.error("Got IOException.", e); + + // Try and unbind provider + try { - because = e.toString(); + VmPipeAddress pipe = new VmPipeAddress(port); + + try + { + _acceptor.unbind(pipe); + } + catch (Exception ignore) + { + // ignore + } + + if (provider == null) + { + provider = createBrokerInstance(port); + } + + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); + _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - else + catch (IOException justUseFirstException) { - because = e.getCause().toString(); - } + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } - throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + } } } + else + { + _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); + } } - else - { - _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); - } - } private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException @@ -285,25 +289,29 @@ { _logger.info("Killing all VM Brokers"); _acceptor.unbindAll(); - - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) + synchronized (_inVmPipeAddress) { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } + Iterator keys = _inVmPipeAddress.keySet().iterator(); + while (keys.hasNext()) + { + int id = (Integer) keys.next(); + _inVmPipeAddress.remove(id); + } + } } public static void killVMBroker(int port) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) + synchronized (_inVmPipeAddress) { - _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); - _acceptor.unbind(pipe); + VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + if (pipe != null) + { + _logger.info("Killing VM Broker:" + port); + _inVmPipeAddress.remove(port); + _acceptor.unbind(pipe); + } } }
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java Fri Sep 28 03:41:49 2007 @@ -68,6 +68,7 @@ protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } private void init(AMQConnection connection) throws Exception Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java Fri Sep 28 03:41:49 2007 @@ -48,6 +48,7 @@ protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } /** Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Fri Sep 28 03:41:49 2007 @@ -125,6 +125,7 @@ protected void tearDown() throws Exception { closeConnection(); + TransportConnection.killAllVMBrokers(); super.tearDown(); } Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java Fri Sep 28 03:41:49 2007 @@ -20,8 +20,11 @@ */ package org.apache.qpid.test.unit.client.connection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.JMSException; import javax.jms.Message; @@ -30,14 +33,20 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import junit.framework.TestCase; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.transport.TransportConnection; - +/** + * ConnectionStartTest: + * This test verifies that a fresh connection is not started and no messages are delivered until the connection is + * started. + * + * After the connection is started then the message should be there, and the connection started. + * + * This Test verifies that using receive() and a messageListener does not cause message delivery before start is called. + * + */ public class ConnectionStartTest extends TestCase { @@ -54,11 +63,18 @@ try { + // Create Consumer Connection + _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); - AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + Queue queue = _consumerSess.createQueue("ConnectionStartTest"); - AMQQueue queue = new AMQQueue(pubCon,"ConnectionStartTest"); + _consumer = _consumerSess.createConsumer(queue); + + + // Create Producer Connection to send message + AMQConnection pubCon = new AMQConnection(_broker, "guest", "guest", "fred", "test"); Session pubSess = pubCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); @@ -66,12 +82,6 @@ pub.send(pubSess.createTextMessage("Initial Message")); - _connection = new AMQConnection(_broker, "guest", "guest", "fred", "test"); - - _consumerSess = _connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); - - _consumer = _consumerSess.createConsumer(queue); - pubCon.close(); } @@ -85,6 +95,7 @@ { _connection.close(); TransportConnection.killVMBroker(1); + super.tearDown(); } public void testSimpleReceiveConnection() @@ -94,9 +105,9 @@ assertTrue("Connection should not be started", !_connection.started()); //Note that this next line will start the dispatcher in the session // should really not be called before _connection start - assertTrue("There should not be messages waiting for the consumer", _consumer.receiveNoWait() == null); + assertNull("There should not be messages waiting for the consumer", _consumer.receiveNoWait()); _connection.start(); - assertTrue("There should be messages waiting for the consumer", _consumer.receive(1000) == null); + assertNotNull("There should be messages waiting for the consumer", _consumer.receive(1000)); assertTrue("Connection should be started", _connection.started()); } @@ -131,7 +142,11 @@ } }); + // Ensure that setting a ML doesn't start the connection assertTrue("Connection should not be started", !_connection.started()); + // Ensure that the message wasn't delivered while the connection was stopped. + assertEquals("Message latch should still be set",1,_gotMessage.getCount()); + _connection.start(); assertTrue("Connection should be started", _connection.started()); Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Sep 28 03:41:49 2007 @@ -434,6 +434,13 @@ verifyMessages(_consumer.receive(1000)); } + /** + * This test sends two messages receives on of them but doesn't ack it. + * The consumer is then closed + * the first message should be returned as redelivered. + * the second message should be delivered normally. + * @throws Exception + */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original) +++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Fri Sep 28 03:41:49 2007 @@ -711,7 +711,10 @@ if (trace) { _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); - _logger.trace(_properties.toString()); + if (_properties != null) + { + _logger.trace(_properties.toString()); + } } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original) +++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Sep 28 03:41:49 2007 @@ -885,24 +885,8 @@ synchronized (_sendPauseMonitor) { if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) - // && (_sendPauseBarrier.getNumberWaiting() == 1)) { - // log.debug("unreceived size estimate under limit = " + unreceivedSize); - - // Wait on the send pause barrier for the limit to be re-established. - /*try - {*/ - // _sendPauseBarrier.await(); _sendPauseMonitor.notify(); - /*} - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - catch (BrokenBarrierException e) - { - throw new RuntimeException(e); - }*/ } } @@ -1159,12 +1143,23 @@ // If necessary, wait until the max pending message size comes within its limit. synchronized (_sendPauseMonitor) { + // Used to keep track of the number of times that send has to wait. + int numWaits = 0; + + // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with + // the test timeout. + int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); + while ((_maxPendingSize > 0)) { // Get the size estimate of sent but not yet received messages. int unreceived = _unreceived.get(); int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + // log.debug("_maxPendingSize = " + _maxPendingSize); + if (unreceivedSize > _maxPendingSize) { // log.debug("unreceived size estimate over limit = " + unreceivedSize); @@ -1172,8 +1167,8 @@ // Wait on the send pause barrier for the limit to be re-established. try { - // _sendPauseBarrier.await(); - _sendPauseMonitor.wait(1000); + _sendPauseMonitor.wait(10000); + numWaits++; } catch (InterruptedException e) { @@ -1181,10 +1176,17 @@ Thread.currentThread().interrupt(); throw new RuntimeException(e); } - /*catch (BrokenBarrierException e) + + // Fail the test if the send has had to wait more than the maximum allowed number of times. + if (numWaits >= waitLimit) { - throw new RuntimeException(e); - }*/ + String errorMessage = + "Send has had to wait for the unreceivedSize (" + unreceivedSize + + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit + + " times."; + log.warn(errorMessage); + throw new RuntimeException(errorMessage); + } } else { Modified: incubator/qpid/branches/M2/java/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/pom.xml?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/pom.xml (original) +++ incubator/qpid/branches/M2/java/pom.xml Fri Sep 28 03:41:49 2007 @@ -506,7 +506,7 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> - <version>1.0</version> + <version>1.4.3</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original) +++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Sep 28 03:41:49 2007 @@ -33,6 +33,14 @@ */ public class TestableMemoryMessageStore extends MemoryMessageStore { + + MemoryMessageStore _mms = null; + + public TestableMemoryMessageStore(MemoryMessageStore mms) + { + _mms = mms; + } + public TestableMemoryMessageStore() { _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); @@ -41,11 +49,25 @@ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() { - return _metaDataMap; + if (_mms != null) + { + return _mms._metaDataMap; + } + else + { + return _metaDataMap; + } } public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() { - return _contentBodyMap; + if (_mms != null) + { + return _mms._contentBodyMap; + } + else + { + return _contentBodyMap; + } } } Modified: incubator/qpid/branches/M2/python/tests/basic.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/python/tests/basic.py?rev=580293&r1=580292&r2=580293&view=diff ============================================================================== --- incubator/qpid/branches/M2/python/tests/basic.py (original) +++ incubator/qpid/branches/M2/python/tests/basic.py Fri Sep 28 03:41:49 2007 @@ -339,9 +339,11 @@ channel = self.channel channel.queue_declare(queue="test-get", exclusive=True) - #publish some messages (no_ack=True) + #publish some messages (no_ack=True) with persistent messaging for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) #use basic_get to read back the messages, and check that we get an empty at the end for i in range(1, 11): @@ -354,18 +356,53 @@ self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-empty") - #repeat for no_ack=False + + #publish some messages (no_ack=True) transient messaging for i in range(11, 21): channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + #use basic_get to read back the messages, and check that we get an empty at the end for i in range(11, 21): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #repeat for no_ack=False + + #publish some messages (no_ack=False) with persistent messaging + for i in range(21, 31): + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(21, 31): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #public some messages (no_ack=False) with transient messaging + for i in range(31, 41): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + for i in range(31, 41): reply = channel.basic_get(no_ack=False) self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-ok") self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): + if(i == 33): channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): + if(i in [35, 37, 39]): channel.basic_ack(delivery_tag=reply.delivery_tag) reply = channel.basic_get(no_ack=True) @@ -375,8 +412,8 @@ #recover(requeue=True) channel.basic_recover(requeue=True) - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: + #get the unacked messages again (34, 36, 38, 40) + for i in [34, 36, 38, 40]: reply = channel.basic_get(no_ack=False) self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-ok")
