Added: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java?view=auto&rev=512699 ============================================================================== --- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java (added) +++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java Wed Feb 28 03:05:20 2007 @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop; + +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Listener implements the listening end of the Qpid interop tests. It is capable of being run as a standalone listener + * that responds to the test messages send by the publishing end of the tests implemented by [EMAIL PROTECTED] Publisher}. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Count messages received on a topic. <td> [EMAIL PROTECTED] Publisher} + * <tr><td> Send reports on messages received, when requested to. <td> [EMAIL PROTECTED] Publisher} + * <tr><td> Shutdown, when requested to. <td> [EMAIL PROTECTED] Publisher} + * <tr><td> + * + * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with + * interop spec in mind. + * + * @todo I've added lots of field table types in the report message, just to check if the other end can decode them + * correctly. Not really the right place to test this, so remove them from [EMAIL PROTECTED] #sendReport()} once a better + * test exists. + */ +public class Listener implements MessageListener +{ + private static Logger log = Logger.getLogger(Listener.class); + + /** The default AMQ connection URL to use for tests. */ + public static final String DEFAULT_URI = "amqp://guest:[EMAIL PROTECTED]/test?brokerlist='tcp://localhost:5672'"; + + /** Holds the name of (routing key for) the topic to receive test messages on. */ + public static final String CONTROL_TOPIC = "topic_control"; + + /** Holds the name of (routing key for) the queue to send reports to. */ + public static final String RESPONSE_QUEUE = "response"; + + /** Holds the JMS Topic to receive test messages on. */ + private final Topic _topic; + + /** Holds the JMS Queue to send reports to. */ + private final Queue _response; + + /** Holds the connection to listen on. */ + private final Connection _connection; + + /** Holds the producer to send control messages on. */ + private final MessageProducer _controller; + + /** Holds the JMS session. */ + private final javax.jms.Session _session; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ + private boolean init; + + /** Holds the count of messages received by this listener. */ + private int count; + + /** Used to hold the start time of the first message. */ + private long start; + + /** + * Creates a topic listener using the specified broker URL. + * + * @param connectionUrl The broker URL to listen on. + * + * @throws AMQException If the broker connection cannot be established. + * @throws URLSyntaxException If the broker URL syntax is not correct. + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + Listener(String connectionUrl) throws AMQException, JMSException, URLSyntaxException + { + log.debug("Listener(String connectionUrl = " + connectionUrl + "): called"); + + // Create a connection to the broker. + _connection = new AMQConnection(connectionUrl); + + // Establish a session on the broker. + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set up the destinations to listen for test and control messages on. + _topic = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + + // Set this listener up to listen for incoming messages on the test topic. + _session.createConsumer(_topic).setMessageListener(this); + + // Set up this listener with a producer to send the reports on. + _controller = _session.createProducer(_response); + + _connection.start(); + System.out.println("Waiting for messages..."); + } + + /** + * Starts a test subscriber. The broker URL must be specified as the first command line argument. + * + * @param argv The command line arguments, ignored. + * + * @todo Add command line arguments to configure all aspects of the test. + */ + public static void main(String[] argv) + { + try + { + new Listener(DEFAULT_URI); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + /** + * Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and + * shutdown messages result in this listener being terminated. + * + * @param message The received message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Take the start time of the first message if this is the first message. + if (!init) + { + start = System.nanoTime() / 1000000; + count = 0; + init = true; + } + + try + { + // Check if the message is a control message telling this listener to shut down. + if (isShutdown(message)) + { + log.debug("Got a shutdown message."); + shutdown(); + } + // Check if the message is a report request message asking this listener to respond with the message count. + else if (isReport(message)) + { + log.debug("Got a report request message."); + + // Send the message count report. + sendReport(); + + // Reset the initialization flag so that the next message is considered to be the first. + init = false; + } + // Otherwise it is an ordinary test message, so increment the message count. + else + { + count++; + } + } + catch (JMSException e) + { + log.warn("There was a JMSException during onMessage.", e); + } + } + + /** + * Checks a message to see if it is a termination request control message. + * + * @param m The message to check. + * + * @return <tt>true</tt> if it is a termination request control message, <tt>false</tt> otherwise. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + return result; + } + + /** + * Checks a message to see if it is a report request control message. + * + * @param m The message to check. + * + * @return <tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + return result; + } + + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException + { + //log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + // + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + //log.debug("comp = " + comp); + + boolean result = (comp != null) && comp.equals(value); + //log.debug("result = " + result); + + return result; + } + + /** + * Closes down the connection to the broker. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void shutdown() throws JMSException + { + _session.close(); + _connection.stop(); + _connection.close(); + } + + /** + * Send the report message to the response queue. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void sendReport() throws JMSException + { + log.debug("private void report(): called"); + + // Create the report message. + long time = ((System.nanoTime() / 1000000) - start); + String msg = "Received " + count + " in " + time + "ms"; + Message message = _session.createTextMessage(msg); + + // Shove some more field table types in the message just to see if the other end can handle it. + message.setBooleanProperty("BOOLEAN", true); + //message.setByteProperty("BYTE", (byte) 5); + message.setDoubleProperty("DOUBLE", Math.PI); + message.setFloatProperty("FLOAT", 1.0f); + message.setIntProperty("INT", 1); + message.setShortProperty("SHORT", (short) 1); + message.setLongProperty("LONG", (long) 1827361278); + message.setStringProperty("STRING", "hello"); + + // Send the report message. + _controller.send(message); + log.debug("Sent report: " + msg); + } +}
Added: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java?view=auto&rev=512699 ============================================================================== --- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java (added) +++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java Wed Feb 28 03:05:20 2007 @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Publisher is the sending end of Qpid interop tests. It is capable of being run as a standalone publisher + * that sends test messages to the listening end of the tests implemented by [EMAIL PROTECTED] Listener}. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> + * + * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with + * interop spec in mind. + * + * @todo I've added lots of field table types in the report request message, just to check if the other end can decode + * them correctly. Not really the right place to test this, so remove them from [EMAIL PROTECTED] #doTest()} once a better + * test exists. + */ +public class Publisher implements MessageListener +{ + private static Logger log = Logger.getLogger(Publisher.class); + + /** The default AMQ connection URL to use for tests. */ + public static final String DEFAULT_URI = "amqp://guest:[EMAIL PROTECTED]/test?brokerlist='tcp://localhost:5672'"; + + /** Holds the default test timeout for broker communications before tests give up. */ + public static final int TIMEOUT = 3000; + + /** Holds the routing key for the topic to send test messages on. */ + public static final String CONTROL_TOPIC = "topic_control"; + + /** Holds the routing key for the queue to receive reports on. */ + public static final String RESPONSE_QUEUE = "response"; + + /** Holds the JMS Topic to send test messages on. */ + private final Topic _topic; + + /** Holds the JMS Queue to receive reports on. */ + private final Queue _response; + + /** Holds the number of messages to send in each test run. */ + private int numMessages; + + /** A monitor used to wait for all reports to arrive back from consumers on. */ + private CountDownLatch allReportsReceivedEvt; + + /** Holds the connection to listen on. */ + private Connection _connection; + + /** Holds the channel for all test messages.*/ + private Session _session; + + /** Holds the producer to send test messages on. */ + private MessageProducer publisher; + + /** + * Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test + * subscribers. + * + * @param connectionUri The broker URL. + * @param numMessages The number of messages to send in each test. + * @param numSubscribers The number of subscribes that are expected to reply with a report. + */ + Publisher(String connectionUri, int numMessages, int numSubscribers) + throws AMQException, JMSException, URLSyntaxException + { + log.debug("Publisher(String connectionUri = " + connectionUri + ", int numMessages = " + numMessages + + ", int numSubscribers = " + numSubscribers + "): called"); + + // Create a connection to the broker. + _connection = new AMQConnection(connectionUri); + + // Establish a session on the broker. + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Set up the destinations to send test messages and listen for reports on. + _topic = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); + + // Set this listener up to listen for reports on the response queue. + _session.createConsumer(_response).setMessageListener(this); + + // Set up this listener with a producer to send the test messages and report requests on. + publisher = _session.createProducer(_topic); + + // Keep the test parameters. + this.numMessages = numMessages; + + // Set up a countdown to count all subscribers sending their reports. + allReportsReceivedEvt = new CountDownLatch(numSubscribers); + + _connection.start(); + System.out.println("Sending messages and waiting for reports..."); + } + + /** + * Start a test publisher. The broker URL must be specified as the first command line argument. + * + * @param argv The command line arguments, ignored. + * + * @todo Add command line arguments to configure all aspects of the test. + */ + public static void main(String[] argv) + { + try + { + // Create an instance of this publisher with the command line parameters. + Publisher publisher = new Publisher(DEFAULT_URI, 1, 1); + + // Publish the test messages. + publisher.doTest(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + /** + * Sends the test messages and waits for all subscribers to reply with a report. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void doTest() throws JMSException + { + log.debug("public void DoTest(): called"); + + // Create a test message to send. + Message testMessage = _session.createTextMessage("test"); + + // Send the desired number of test messages. + for (int i = 0; i < numMessages; i++) + { + publisher.send(testMessage); + } + + log.debug("Sent " + numMessages + " test messages."); + + // Send the report request. + Message reportRequestMessage = _session.createTextMessage("Report request message."); + reportRequestMessage.setStringProperty("TYPE", "REPORT_REQUEST"); + + reportRequestMessage.setBooleanProperty("BOOLEAN", false); + //reportRequestMessage.Headers.SetByte("BYTE", 5); + reportRequestMessage.setDoubleProperty("DOUBLE", 3.141); + reportRequestMessage.setFloatProperty("FLOAT", 1.0f); + reportRequestMessage.setIntProperty("INT", 1); + reportRequestMessage.setLongProperty("LONG", 1); + reportRequestMessage.setStringProperty("STRING", "hello"); + reportRequestMessage.setShortProperty("SHORT", (short) 2); + + publisher.send(reportRequestMessage); + + log.debug("Sent the report request message, waiting for all replies..."); + + // Wait until all the reports come in. + try + { + allReportsReceivedEvt.await(TIMEOUT, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { } + + // Check if all reports were really received or if the timeout occurred. + if (allReportsReceivedEvt.getCount() == 0) + { + log.debug("Got all reports."); + } + else + { + log.debug("Waiting for reports timed out, still waiting for " + allReportsReceivedEvt.getCount() + "."); + } + + // Send the termination request. + Message terminationRequestMessage = _session.createTextMessage("Termination request message."); + terminationRequestMessage.setStringProperty("TYPE", "TERMINATION_REQUEST"); + publisher.send(terminationRequestMessage); + + log.debug("Sent the termination request message."); + + // Close all message producers and consumers and the connection to the broker. + shutdown(); + } + + /** + * Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes + * zero, at which time waiting threads are notified of this event. + * + * @param message The received report message. + */ + public void onMessage(Message message) + { + log.debug("public void OnMessage(Message message = " + message + "): called"); + + // Decrement the count of expected messages and release the wait monitor when this becomes zero. + allReportsReceivedEvt.countDown(); + + if (allReportsReceivedEvt.getCount() == 0) + { + log.debug("Got reports from all subscribers."); + } + } + + /** + * Stops the message consumers and closes the connection. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + private void shutdown() throws JMSException + { + _session.close(); + _connection.close(); + } +} Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java?view=diff&rev=512699&r1=512698&r2=512699 ============================================================================== --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java (original) +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java Wed Feb 28 03:05:20 2007 @@ -47,18 +47,14 @@ * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", * which the listener should close its connection and terminate upon receipt of. * - * @deprecated Use PingPongBouncer instead once the below todo is completed. - * - * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to - * PingListener and make its bouncing functionality optional, either through a switch or as an extending class - * called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code - * accross p2p and topic style tests in almost all cases. + * @todo I've added lots of field table types in the report message, just to check if the other end can decode them + * correctly. Not really the right place to test this, so remove them from + * [EMAIL PROTECTED] #createReportResponseMessage(String)} once a better test exists. */ public class Listener implements MessageListener { private static Logger log = Logger.getLogger(Listener.class); - private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); public static final String CONTROL_TOPIC = "topic_control"; public static final String RESPONSE_QUEUE = "response"; @@ -67,8 +63,6 @@ private final Queue _response; - private final byte[] _payload; - /** Holds the connection to listen on. */ private final Connection _connection; @@ -109,15 +103,6 @@ _response = _session.createQueue(RESPONSE_QUEUE); } - int size = 256; - - _payload = new byte[size]; - - for (int i = 0; i < size; i++) - { - _payload[i] = (byte) DATA[i % DATA.length]; - } - //register for events if (name == null) { @@ -182,15 +167,19 @@ + ", String value = " + value + "): called"); String comp = m.getStringProperty(fieldName); + log.debug("comp = " + comp); - return (comp != null) && comp.equals(value); + boolean result = (comp != null) && comp.equals(value); + log.debug("result = " + result); + + return result; } public void onMessage(Message message) { NDC.push(clientId); - log.debug("public void onMessage(Message message): called"); + log.debug("public void onMessage(Message message = " + message + "): called"); if (!init) { @@ -203,11 +192,14 @@ { if (isShutdown(message)) { + log.debug("Got a shutdown message."); shutdown(); } else if (isReport(message)) { - //send a report: + log.debug("Got a report request message."); + + // Send the report. report(); init = false; } @@ -224,14 +216,26 @@ Message createReportResponseMessage(String msg) throws JMSException { - return _session.createTextMessage(msg); + Message message = _session.createTextMessage(msg); + + // Shove some more field table type in the message just to see if the other end can handle it. + message.setBooleanProperty("BOOLEAN", true); + message.setByteProperty("BYTE", (byte) 5); + message.setDoubleProperty("DOUBLE", Math.PI); + message.setFloatProperty("FLOAT", 1.0f); + message.setIntProperty("INT", 1); + message.setShortProperty("SHORT", (short) 1); + message.setLongProperty("LONG", (long) 1827361278); + message.setStringProperty("STRING", "hello"); + + return message; } boolean isShutdown(Message m) throws JMSException { boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); - log.debug("isShutdown = " + result); + //log.debug("isShutdown = " + result); return result; } @@ -240,7 +244,7 @@ { boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); - log.debug("isReport = " + result); + //log.debug("isReport = " + result); return result; } @@ -276,11 +280,13 @@ private void report() { + log.debug("private void report(): called"); + try { String msg = getReport(); _controller.send(createReportResponseMessage(msg)); - System.out.println("Sent report: " + msg); + log.debug("Sent report: " + msg); } catch (Exception e) { Modified: incubator/qpid/trunk/qpid/java/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=512699&r1=512698&r2=512699 ============================================================================== --- incubator/qpid/trunk/qpid/java/pom.xml (original) +++ incubator/qpid/trunk/qpid/java/pom.xml Wed Feb 28 03:05:20 2007 @@ -142,6 +142,7 @@ <module>cluster</module> <module>systests</module> <module>perftests</module> + <module>integrationtests</module> <module>management/eclipse-plugin</module> <module>client/example</module> <module>client-java14</module>
