Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=517638&r1=517637&r2=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Mar 13 03:35:42 2007 @@ -39,8 +39,8 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.testutil.VMBrokerSetup; public class PropertyValueTest extends TestCase implements MessageListener { @@ -59,19 +59,13 @@ protected void setUp() throws Exception { super.setUp(); - try - { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); - } - catch (Exception e) - { - fail("Unable to initialilse connection: " + e); - } + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killVMBroker(1); } private void init(AMQConnection connection) throws Exception @@ -91,14 +85,48 @@ connection.start(); } - public void test() throws Exception + public void testOnce() + { + runBatch(1); + } + + public void test50() + { + runBatch(50); + } + + private void runBatch(int runSize) { - int count = _count; - send(count); - waitFor(count); - check(); - _logger.info("Completed without failure"); - _connection.close(); + try + { + int run = 0; + while (run < runSize) + { + _logger.error("Run Number:" + run++); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + + int count = _count; + send(count); + waitFor(count); + check(); + _logger.info("Completed without failure"); + _connection.close(); + + _logger.error("End Run Number:" + (run - 1)); + } + } + catch (Exception e) + { + _logger.error(e.getMessage(), e); + e.printStackTrace(); + } } void send(int count) throws JMSException @@ -138,7 +166,7 @@ m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.info("Message:" + m); + _logger.trace("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -150,7 +178,7 @@ m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); - _logger.info("Sending Msg:" + m); + _logger.debug("Sending Msg:" + m); producer.send(m); } } @@ -206,8 +234,11 @@ Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); } + received.clear(); assertEqual(messages.iterator(), actual.iterator()); + + messages.clear(); } private static void assertEqual(Iterator expected, Iterator actual) @@ -269,11 +300,11 @@ { test._count = Integer.parseInt(argv[1]); } - test.test(); + test.testOnce(); } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class)); + return new junit.framework.TestSuite(PropertyValueTest.class); } }
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=517638&r1=517637&r2=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original) +++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Mar 13 03:35:42 2007 @@ -42,6 +42,7 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidClientConnection; import org.apache.log4j.Logger; import org.apache.log4j.Level; @@ -62,14 +63,14 @@ private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed=false; + private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -85,21 +86,28 @@ { super.tearDown(); - if (!passed) + if (!passed) // clean up { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue conn.consume(queue, consumeTimeout); + + conn.disconnect(); } TransportConnection.killVMBroker(1); } - /** multiple consumers */ + /** + * multiple consumers + * + * @throws javax.jms.JMSException if a JMS problem occurs + * @throws InterruptedException on timeout + */ public void testDrain() throws JMSException, InterruptedException { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -170,6 +178,7 @@ assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); + passed = true; } /** multiple consumers */ @@ -186,8 +195,8 @@ Thread t4 = new Thread(c4); t1.start(); -// t2.start(); -// t3.start(); + t2.start(); + t3.start(); // t4.start(); try @@ -230,7 +239,7 @@ } assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed=true; + passed = true; } class Consumer implements Runnable @@ -248,7 +257,7 @@ try { _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -318,286 +327,51 @@ } - public class QpidClientConnection implements ExceptionListener + public void testRequeue() throws JMSException, AMQException, URLSyntaxException { - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection() + int run = 0; + while (run < 10) { - super(); - setVirtualHost("/test"); - setBrokerList(BROKER); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:[EMAIL PROTECTED]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + run++; - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } - - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) + if (_logger.isInfoEnabled()) { - connect(); + _logger.info("testRequeue run " + run); } - Queue queue = session.createQueue(queueName); + String virtualHost = "/test"; + String brokerlist = BROKER; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - final MessageConsumer consumer = session.createConsumer(queue); + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue q = session.createQueue(queue); - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; + _logger.debug("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) - { - result = null; - } - else + try { - _logger.info("warning: received non-text message"); - result = message; + Thread.sleep(2000); } - - return result; - } - - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } - - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) + catch (InterruptedException e) { - connect(); + // } - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); + _logger.debug("Receiving msg"); + Message msg = consumer.receive(1000); - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; + assertNotNull("Message should not be null", msg); - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - session.commit(); + // As we have not ack'd message will be requeued. + _logger.debug("Close Consumer"); consumer.close(); - _logger.info("consumed: " + messagesReceived); - } - } - - - public void testRequeue() throws JMSException, AMQException, URLSyntaxException - { - String virtualHost = "/test"; - String brokerlist = "vm://:1"; - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - - Connection conn = new AMQConnection(brokerUrl); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue q = session.createQueue(queue); - - _logger.info("Create Consumer"); - MessageConsumer consumer = session.createConsumer(q); - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // + _logger.debug("Close Connection"); + conn.close(); } - - _logger.info("Receiving msg"); - Message msg = consumer.receive(); - - assertNotNull("Message should not be null", msg); - - _logger.info("Close Consumer"); - consumer.close(); - - _logger.info("Close Connection"); - conn.close(); } } Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?view=auto&rev=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (added) +++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Tue Mar 13 03:35:42 2007 @@ -0,0 +1,268 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:[EMAIL PROTECTED]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} + Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?view=diff&rev=517638&r1=517637&r2=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java Tue Mar 13 03:35:42 2007 @@ -181,8 +181,37 @@ @Override public Iterator<E> iterator() { - throw new RuntimeException("Not Implemented"); + final Iterator<E> mainMessageIterator = super.iterator(); + return new Iterator<E>() + { + final Iterator<E> _headIterator = _messageHead.iterator(); + final Iterator<E> _mainIterator = mainMessageIterator; + Iterator<E> last; + + public boolean hasNext() + { + return _headIterator.hasNext() || _mainIterator.hasNext(); + } + + public E next() + { + if (_headIterator.hasNext()) + { + last = _headIterator; + return _headIterator.next(); + } + else + { + last = _mainIterator; + return _mainIterator.next(); + } + } + public void remove() + { + last.remove(); + } + }; } @Override Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java?view=auto&rev=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java (added) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java Tue Mar 13 03:35:42 2007 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import java.util.Hashtable; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; + +public class VMTestCase extends TestCase +{ + protected long RECEIVE_TIMEOUT = 1000l; // 1 sec + protected long CLOSE_TIMEOUT = 10000l; // 10 secs + + protected Context _context; + protected String _clientID; + protected String _virtualhost; + protected String _brokerlist; + + protected final Map<String, String> _connections = new HashMap<String, String>(); + protected final Map<String, String> _queues = new HashMap<String, String>(); + protected final Map<String, String> _topics = new HashMap<String, String>(); + + protected void setUp() throws Exception + { + super.setUp(); + try + { + TransportConnection.createVMBroker(1); + } + catch (Exception e) + { + fail("Unable to create broker: " + e); + } + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + if (_clientID == null) + { + _clientID = this.getClass().getName(); + } + + if (_virtualhost == null) + { + _virtualhost = "/test"; + } + + if (_brokerlist == null) + { + _brokerlist = "vm://:1"; + } + + env.put("connectionfactory.connection", "amqp://client:client@" + + _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'"); + + for (Map.Entry<String, String> c : _connections.entrySet()) + { + env.put("connectionfactory." + c.getKey(), c.getValue()); + } + + env.put("queue.queue", "queue"); + + for (Map.Entry<String, String> q : _queues.entrySet()) + { + env.put("queue." + q.getKey(), q.getValue()); + } + + env.put("topic.topic", "topic"); + + for (Map.Entry<String, String> t : _topics.entrySet()) + { + env.put("topic." + t.getKey(), t.getValue()); + } + + _context = factory.getInitialContext(env); + } + + protected void tearDown() throws Exception + { + TransportConnection.killVMBroker(1); + super.tearDown(); + } +} Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java?view=auto&rev=517638 ============================================================================== --- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java (added) +++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java Tue Mar 13 03:35:42 2007 @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.log4j.Logger; + +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.QueueBrowser; +import javax.jms.TextMessage; +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.Message; +import java.util.Enumeration; + +public class QueueBrowserTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class); + + private static final int MSG_COUNT = 10; + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + _queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + } + + /* + * Test Messages Remain on Queue + * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there + * + */ + + public void queueBrowserMsgsRemainOnQueueTest() throws JMSException + { + + // create QueueBrowser + _logger.info("Creating Queue Browser"); + + QueueBrowser queueBrowser = _clientSession.createBrowser(_queue); + + // check for messages + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser"); + } + + int msgCount = 0; + Enumeration msgs = queueBrowser.getEnumeration(); + + while (msgs.hasMoreElements()) + { + msgs.nextElement(); + msgCount++; + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Found " + msgCount + " messages total in browser"); + } + + // check to see if all messages found +// assertEquals("browser did not find all messages", MSG_COUNT, msgCount); + if (msgCount != MSG_COUNT) + { + _logger.warn(msgCount + "/" + MSG_COUNT + " messages received."); + } + + //Close browser + queueBrowser.close(); + + // VERIFY + + // continue and try to receive all messages + MessageConsumer consumer = _clientSession.createConsumer(_queue); + + _logger.info("Verify messages are still on the queue"); + + Message tempMsg; + + for (msgCount = 0; msgCount < MSG_COUNT; msgCount++) + { + tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + if (tempMsg == null) + { + fail("Message " + msgCount + " not retrieved from queue"); + } + } + + _logger.info("All messages recevied from queue"); + } + + +} Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date
