Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java?view=diff&rev=551207&r1=551206&r2=551207 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java Wed Jun 27 08:49:51 2007 @@ -25,10 +25,11 @@ import org.apache.log4j.NDC; import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +42,11 @@ import javax.naming.InitialContext; import javax.naming.NamingException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; /** * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS @@ -58,6 +62,10 @@ * connected. * <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is * connected. + * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is + * disconnected. + * <tr><dt> Check that an immediate message results in no consumers code, in a transaction, when a consumer is + * disconnected. * </table> * * @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties, @@ -73,63 +81,215 @@ private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class); /** Used to read the tests configurable properties through. */ - ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + ParsedProperties testProps; - /** All these tests should have the immediate flag on. */ - private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); + /** Used to create unique destination names for each test. + * @todo Move into the test framework. + */ + private static AtomicLong uniqueDestsId = new AtomicLong(); /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_517_ImmediateOkNoTx() throws Exception + public void test_QPID_517_ImmediateOkNoTxP2P() throws Exception { // Ensure transactional sessions are off. testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); // Send one message with no errors. - PublisherReceiverImpl.testNoExceptions(testProps); + testClients.testNoExceptions(testProps); } /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_517_ImmediateOkTx() throws Exception + public void test_QPID_517_ImmediateOkTxP2P() throws Exception { // Ensure transactional sessions are off. testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); // Send one message with no errors. - PublisherReceiverImpl.testNoExceptions(testProps); + testClients.testNoExceptions(testProps); + } + + /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */ + public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */ + public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoRouteException.class); + } + + /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoRouteException.class); + } + + /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_517_ImmediateOkNoTxPubSub() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_517_ImmediateOkTxPubSub() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */ + public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); + + // Use durable subscriptions, so that the route remains open with no subscribers. + testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoConsumersException.class); + } + + /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */ + public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); + + // Use durable subscriptions, so that the route remains open with no subscribers. + testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); + + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoConsumersException.class); } /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */ - public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception + public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception { // Ensure transactional sessions are off. testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to // collect its messages). testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + // Send one message and get a linked no consumers exception. - PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + testClients.testWithAssertions(testProps, AMQNoRouteException.class); } /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */ - public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception + public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception { // Ensure transactional sessions are on. testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to // collect its messages). testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps); + // Send one message and get a linked no consumers exception. - PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class); + testClients.testWithAssertions(testProps, AMQNoRouteException.class); } protected void setUp() throws Exception { NDC.push(getName()); + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the immediate flag on. */ + testProps.setProperty(IMMEDIATE_PROPNAME, true); + + /** Bind the receivers consumer by default. */ + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true); + // Ensure that the in-vm broker is created. TransportConnection.createVMBroker(1); } @@ -220,10 +380,64 @@ return false; } + /** + * Reports the number of exceptions held by this monitor. + * + * @return The number of exceptions held by this monitor. + */ + public int size() + { + return exceptions.size(); + } + public void reset() { exceptions = new ArrayList(); } + + /** + * Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly + * use for debugging/test failure reporting purposes. + * + * @return A string containing a dump of the stack traces of all exceptions. + */ + public String toString() + { + String result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n"; + + for (JMSException ex : exceptions) + { + result += getStackTrace(ex) + "\n"; + } + + return result; + } + + /** + * Prints an exception stack trace into a string. + * + * @param t The throwable to get the stack trace from. + * + * @return A string containing the throwables stack trace. + */ + public static String getStackTrace(Throwable t) + { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw, true); + t.printStackTrace(pw); + pw.flush(); + sw.flush(); + + return sw.toString(); + } + } + + public static class MessageMonitor implements MessageListener + { + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message): called"); + } } /** @@ -290,22 +504,30 @@ { try { - int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME); - boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME); - String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME); - String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME); + // Get a unique offset to append to destination names to make them unique to the connection. + long uniqueId = uniqueDestsId.incrementAndGet(); + + // Extract the standard test configuration parameters relevant to the connection. + String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId; + String destinationReceiveRoot = + messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId; boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME); boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME); boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME); boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME); + + // Check which JMS flags and options are to be set. + int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME); + boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME); boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME); + boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME); // Check if any Qpid/AMQP specific flags or options need to be set. boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME); boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME); boolean needsQpidOptions = immediate | mandatory; - log.debug("ackMode = " + ackMode); + /*log.debug("ackMode = " + ackMode); log.debug("useTopics = " + useTopics); log.debug("destinationSendRoot = " + destinationSendRoot); log.debug("destinationReceiveRoot = " + destinationReceiveRoot); @@ -316,7 +538,7 @@ log.debug("transactional = " + transactional); log.debug("immediate = " + immediate); log.debug("mandatory = " + mandatory); - log.debug("needsQpidOptions = " + needsQpidOptions); + log.debug("needsQpidOptions = " + needsQpidOptions);*/ // Create connection, sessions and producer/consumer pairs on each session. Connection connection = createConnection(messagingProps); @@ -329,7 +551,7 @@ Session receiverSession = connection.createSession(transactional, ackMode); Destination publisherProducerDestination = - useTopics ? publisherSession.createTopic(destinationSendRoot) + useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot) : publisherSession.createQueue(destinationSendRoot); MessageProducer publisherProducer = @@ -342,13 +564,29 @@ createPublisherConsumer ? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null; + if (publisherConsumer != null) + { + publisherConsumer.setMessageListener(new MessageMonitor()); + } + MessageProducer receiverProducer = createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot)) : null; + Destination receiverConsumerDestination = + useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot) + : receiverSession.createQueue(destinationSendRoot); + MessageConsumer receiverConsumer = - createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot)) - : null; + createReceiverConsumer + ? ((durableSubscription && useTopics) + ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub") + : receiverSession.createConsumer(receiverConsumerDestination)) : null; + + if (receiverConsumer != null) + { + receiverConsumer.setMessageListener(new MessageMonitor()); + } // Start listening for incoming messages. connection.start(); @@ -372,7 +610,8 @@ public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException { - return client.getSession().createMessage(); + return client.getSession().createTextMessage("Hello"); + // return client.getSession().createMessage(); } /** @@ -428,12 +667,12 @@ public MessageProducer getProducer() { - return null; + return producer; } public MessageConsumer getConsumer() { - return null; + return consumer; } public void send(Message message) throws JMSException @@ -524,6 +763,10 @@ public ExceptionMonitor getExceptionMonitor(); + public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */); + + public void testNoExceptions(ParsedProperties testProps); + public void close(); } @@ -603,61 +846,62 @@ } } - public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */) + public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */) { - PublisherReceiver testClients; - - // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. - testClients = createPublisherReceiverPairSharedConnection(testProps); - testClients.start(); - - testClients.send(testProps, 1); - + start(); + send(testProps, 1); pause(1000L); String errors = ""; - if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass)) + ExceptionMonitor connectionExceptionMonitor = getConnectionExceptionMonitor(); + if (!connectionExceptionMonitor.assertOneJMSExceptionWithLinkedCause(aClass)) { - errors += "Was expecting linked exception type " + aClass.getName() + ".\n"; + errors += "Was expecting linked exception type " + aClass.getName() + " on the connection.\n"; + errors += + (connectionExceptionMonitor.size() > 0) + ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor) + : "Got no exceptions on the connection."; } // Clean up the publisher/receiver client pair. - testClients.close(); + close(); assertEquals(errors, "", errors); } /** */ - public static void testNoExceptions(ParsedProperties testProps) + public void testNoExceptions(ParsedProperties testProps) { - PublisherReceiver testClients; - - // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. - testClients = createPublisherReceiverPairSharedConnection(testProps); - testClients.start(); - - testClients.send(testProps, 1); - + start(); + send(testProps, 1); pause(1000L); String errors = ""; - if (!testClients.getConnectionExceptionMonitor().assertNoExceptions()) + if (!getConnectionExceptionMonitor().assertNoExceptions()) { - errors += "There were connection exceptions.\n"; + errors += "Was expecting no exceptions.\n"; + errors += "Got the following exceptions on the connection, " + getConnectionExceptionMonitor(); } - if (!testClients.getExceptionMonitor().assertNoExceptions()) + if (!getExceptionMonitor().assertNoExceptions()) { - errors += "There were exceptions on producer.\n"; + errors += "Was expecting no exceptions.\n"; + errors += "Got the following exceptions on the producer, " + getExceptionMonitor(); } // Clean up the publisher/receiver client pair. - testClients.close(); + close(); assertEquals(errors, "", errors); + } + + public static PublisherReceiver connectClients(ParsedProperties testProps) + { + // Create a standard publisher/receiver test client pair on a shared connection, individual sessions. + return createPublisherReceiverPairSharedConnection(testProps); } }
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java?view=diff&rev=551207&r1=551206&r2=551207 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java Wed Jun 27 08:49:51 2007 @@ -49,6 +49,10 @@ * connected. * <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is * connected. + * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is + * disconnected but the route exists. + * <tr><dt> Check that a mandatory message is send successfully, in a transactions, when a consumer is + * disconnected but when the route exists. * </table> */ public class MandatoryMessageTest extends TestCase @@ -57,63 +61,233 @@ private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class); /** Used to read the tests configurable properties through. */ - ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + ParsedProperties testProps; - /** All these tests should have the mandatory flag on. */ - // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true); - private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true); + /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ + public void test_QPID_508_MandatoryOkNoTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ + public void test_QPID_508_MandatoryOkTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** + * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but + * the route exists. + */ + public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** + * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but + * the route exists. + */ + public void test_QPID_517_MandatoryOkConsumerDisconnectedTxP2P() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoRouteException.class); + } + + /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ + public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception + { + // Ensure transactional sessions are on. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, false); + + // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to + // collect its messages). + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Send one message and get a linked no consumers exception. + testClients.testWithAssertions(testProps, AMQNoRouteException.class); + } /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */ - public void test_QPID_508_MandatoryOkNoTx() throws Exception + public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception { // Ensure transactional sessions are off. testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); // Send one message with no errors. - ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + testClients.testNoExceptions(testProps); } /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */ - public void test_QPID_508_MandatoryOkTx() throws Exception + public void test_QPID_508_MandatoryOkTxPubSub() throws Exception + { + // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** + * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but + * the route exists. + */ + public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxPubSub() throws Exception { // Ensure transactional sessions are off. + testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); + + // Use durable subscriptions, so that the route remains open with no subscribers. + testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); + + // Send one message with no errors. + testClients.testNoExceptions(testProps); + } + + /** + * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but + * the route exists. + */ + public void test_QPID_517_MandatoryOkConsumerDisconnectedTxPubSub() throws Exception + { + // Ensure transactional sessions are on. testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); + + // Use durable subscriptions, so that the route remains open with no subscribers. + testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true); + + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + + // Disconnect the consumer. + testClients.getReceiver().getConsumer().close(); // Send one message with no errors. - ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps); + testClients.testNoExceptions(testProps); } /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception + public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception { // Ensure transactional sessions are off. testProps.setProperty(TRANSACTED_PROPNAME, false); + testProps.setProperty(PUBSUB_PROPNAME, true); // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to // collect its messages). testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + // Send one message and get a linked no consumers exception. - ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + testClients.testWithAssertions(testProps, AMQNoRouteException.class); } /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */ - public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception + public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception { // Ensure transactional sessions are on. testProps.setProperty(TRANSACTED_PROPNAME, true); + testProps.setProperty(PUBSUB_PROPNAME, true); // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to // collect its messages). testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false); + ImmediateMessageTest.PublisherReceiver testClients = + ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps); + // Send one message and get a linked no consumers exception. - ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class); + testClients.testWithAssertions(testProps, AMQNoRouteException.class); } protected void setUp() throws Exception { NDC.push(getName()); + + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + /** All these tests should have the mandatory flag on. */ + testProps.setProperty(MANDATORY_PROPNAME, true); + + /** Bind the receivers consumer by default. */ + testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true); // Ensure that the in-vm broker is created. TransportConnection.createVMBroker(1); Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java?view=diff&rev=551207&r1=551206&r2=551207 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java (original) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java Wed Jun 27 08:49:51 2007 @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.exchange; import org.apache.qpid.jms.Session; @@ -167,6 +187,12 @@ /** Defines the default message acknowledgement mode. */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */ + public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription"; + + /** Defines the default value of the durable subscriptions flag. */ + public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false; + // ====================== Qpid Options and Flags ================================ /** Holds the name of the property to set the exclusive flag from. */ @@ -272,6 +298,7 @@ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT); defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
