Propchange:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=651325&r1=651324&r2=651325&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
Thu Apr 24 10:49:03 2008
@@ -1,453 +1,453 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
-import org.apache.qpid.exchange.ExchangeDefaults;
-
-/**
- * PingPongBouncer is a message listener the bounces back messages to their
reply to destination. This is used to return
- * ping messages generated by [EMAIL PROTECTED]
org.apache.qpid.requestreply.PingPongProducer} but could be used for other
purposes
- * too.
- *
- * <p/>The correlation id from the received message is extracted, and placed
into the reply as the correlation id. Messages
- * are bounced back to their reply-to destination. The original sender of the
message has the option to use either a unique
- * temporary queue or the correlation id to correlate the original message to
the reply.
- *
- * <p/>There is a verbose mode flag which causes information about each ping
to be output to the console
- * (info level logging, so usually console). This can be helpfull to check the
bounce backs are happening but should
- * be disabled for real timing tests as writing to the console will slow
things down.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Bounce back messages to their reply to destination.
- * <tr><td> Provide command line invocation to start the bounce back on a
configurable broker url.
- * </table>
- *
- * @todo Replace the command line parsing with a neater tool.
- *
- * @todo Make verbose accept a number of messages, only prints to console
every X messages.
- */
-public class PingPongBouncer implements MessageListener
-{
- private static final Logger _logger =
Logger.getLogger(PingPongBouncer.class);
-
- /** The default prefetch size for the message consumer. */
- private static final int PREFETCH = 1;
-
- /** The default no local flag for the message consumer. */
- private static final boolean NO_LOCAL = true;
-
- private static final String DEFAULT_DESTINATION_NAME = "ping";
-
- /** The default exclusive flag for the message consumer. */
- private static final boolean EXCLUSIVE = false;
-
- /** A convenient formatter to use when time stamping output. */
- protected static final SimpleDateFormat timestampFormatter = new
SimpleDateFormat("hh:mm:ss:SS");
-
- /** Used to indicate that the reply generator should log timing info to
the console (logger info level). */
- private boolean _verbose = false;
-
- /** Determines whether this bounce back client bounces back messages
persistently. */
- private boolean _persistent = false;
-
- private Destination _consumerDestination;
-
- /** Keeps track of the response destination of the previous message for
the last reply to producer cache. */
- private Destination _lastResponseDest;
-
- /** The producer for sending replies with. */
- private MessageProducer _replyProducer;
-
- /** The consumer controlSession. */
- private Session _consumerSession;
-
- /** The producer controlSession. */
- private Session _producerSession;
-
- /** Holds the connection to the broker. */
- private AMQConnection _connection;
-
- /** Flag used to indicate if this is a point to point or pub/sub ping
client. */
- private boolean _isPubSub = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to kill
a broker, in order to test
- * failover, immediately before committing a transaction.
- */
- protected boolean _failBeforeCommit = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to a
kill a broker, in order to test
- * failover, immediate after committing a transaction.
- */
- protected boolean _failAfterCommit = false;
-
- /**
- * Creates a PingPongBouncer on the specified producer and consumer
sessions.
- *
- * @param brokerDetails The addresses of the brokers to connect to.
- * @param username The broker username.
- * @param password The broker password.
- * @param virtualpath The virtual host name within the broker.
- * @param destinationName The name of the queue to receive pings on
- * (or root of the queue name where many queues are
generated).
- * @param persistent A flag to indicate that persistent message
should be used.
- * @param transacted A flag to indicate that pings should be sent
within transactions.
- * @param selector A message selector to filter received pings with.
- * @param verbose A flag to indicate that message timings should
be sent to the console.
- *
- * @throws Exception All underlying exceptions allowed to fall through.
This is only test code...
- */
- public PingPongBouncer(String brokerDetails, String username, String
password, String virtualpath,
- String destinationName, boolean persistent, boolean
transacted, String selector, boolean verbose,
- boolean pubsub) throws Exception
- {
- // Create a client id to uniquely identify this client.
- InetAddress address = InetAddress.getLocalHost();
- String clientId = address.getHostName() + System.currentTimeMillis();
- _verbose = verbose;
- _persistent = persistent;
- setPubSub(pubsub);
- // Connect to the broker.
- setConnection(new AMQConnection(brokerDetails, username, password,
clientId, virtualpath));
- _logger.info("Connected with URL:" + getConnection().toURL());
-
- // Set up the failover notifier.
- getConnection().setConnectionListener(new FailoverNotifier());
-
- // Create a controlSession to listen for messages on and one to send
replies on, transactional depending on the
- // command line option.
- _consumerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
- _producerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
-
- // Create the queue to listen for message on.
- createConsumerDestination(destinationName);
- MessageConsumer consumer =
- _consumerSession.createConsumer(_consumerDestination, PREFETCH,
NO_LOCAL, EXCLUSIVE, selector);
-
- // Create a producer for the replies, without a default destination.
- _replyProducer = _producerSession.createProducer(null);
- _replyProducer.setDisableMessageTimestamp(true);
- _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
-
- // Set this up to listen for messages on the queue.
- consumer.setMessageListener(this);
- }
-
- /**
- * Starts a stand alone ping-pong client running in verbose mode.
- *
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- System.out.println("Starting...");
-
- // Display help on the command line.
- if (args.length == 0)
- {
- _logger.info("Running test with default values...");
- //usage();
- //System.exit(0);
- }
-
- // Extract all command line parameters.
- Config config = new Config();
- config.setOptions(args);
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
- String destinationName = config.getDestination();
- if (destinationName == null)
- {
- destinationName = DEFAULT_DESTINATION_NAME;
- }
-
- String selector = config.getSelector();
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- boolean pubsub = config.isPubSub();
- boolean verbose = true;
-
- //String selector = null;
-
- // Instantiate the ping pong client with the command line options and
start it running.
- PingPongBouncer pingBouncer =
- new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath,
destinationName, persistent, transacted,
- selector, verbose, pubsub);
- pingBouncer.getConnection().start();
-
- System.out.println("Waiting...");
- }
-
- private static void usage()
- {
- System.err.println("Usage: PingPongBouncer \n" + "-host : broker
host\n" + "-port : broker port\n"
- + "-destinationname : queue/topic name\n" +
"-transacted : (true/false). Default is false\n"
- + "-persistent : (true/false). Default is false\n"
- + "-pubsub : (true/false). Default is false\n"
+ "-selector : selector string\n");
- }
-
- /**
- * This is a callback method that is notified of all messages for which
this has been registered as a message
- * listener on a message consumer. It sends a reply (pong) to all messages
it receieves on the reply to
- * destination of the message.
- *
- * @param message The message that triggered this callback.
- */
- public void onMessage(Message message)
- {
- try
- {
- String messageCorrelationId = message.getJMSCorrelationID();
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Got
ping with correlation id, "
- + messageCorrelationId);
- }
-
- // Get the reply to destination from the message and check it is
set.
- Destination responseDest = message.getJMSReplyTo();
-
- if (responseDest == null)
- {
- _logger.debug("Cannot send reply because reply-to destination
is null.");
-
- return;
- }
-
- // Spew out some timing information if verbose mode is on.
- if (_verbose)
- {
- Long timestamp = message.getLongProperty("timestamp");
-
- if (timestamp != null)
- {
- long diff = System.currentTimeMillis() - timestamp;
- _logger.info("Time to bounce point: " + diff);
- }
- }
-
- // Correlate the reply to the original.
- message.setJMSCorrelationID(messageCorrelationId);
-
- // Send the receieved message as the pong reply.
- _replyProducer.send(responseDest, message);
-
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Sent
reply with correlation id, "
- + messageCorrelationId);
- }
-
- // Commit the transaction if running in transactional mode.
- commitTx(_producerSession);
- }
- catch (JMSException e)
- {
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
- }
- }
-
- /**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public AMQConnection getConnection()
- {
- return _connection;
- }
-
- /**
- * Sets the connection that this ping client is using.
- *
- * @param connection The ping connection.
- */
- public void setConnection(AMQConnection connection)
- {
- this._connection = connection;
- }
-
- /**
- * Sets or clears the pub/sub flag to indiciate whether this client is
pinging a queue or a topic.
- *
- * @param pubsub <tt>true</tt> if this client is pinging a topic,
<tt>false</tt> if it is pinging a queue.
- */
- public void setPubSub(boolean pubsub)
- {
- _isPubSub = pubsub;
- }
-
- /**
- * Checks whether this client is a p2p or pub/sub ping client.
- *
- * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt>
if it is pinging a queue.
- */
- public boolean isPubSub()
- {
- return _isPubSub;
- }
-
- /**
- * Convenience method to commit the transaction on the specified
controlSession. If the controlSession to commit on is not
- * a transactional controlSession, this method does nothing.
- *
- * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will
prompt the user to kill the broker before the
- * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is
set, this will prompt the user to kill the broker
- * after the commit is applied.
- *
- * @throws javax.jms.JMSException If the commit fails and then the
rollback fails.
- */
- protected void commitTx(Session session) throws JMSException
- {
- if (session.getTransacted())
- {
- try
- {
- if (_failBeforeCommit)
- {
- _logger.debug("Failing Before Commit");
- doFailover();
- }
-
- session.commit();
-
- if (_failAfterCommit)
- {
- _logger.debug("Failing After Commit");
- doFailover();
- }
-
- _logger.debug("Session Commited.");
- }
- catch (JMSException e)
- {
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
-
- try
- {
- session.rollback();
- _logger.debug("Message rolled back.");
- }
- catch (JMSException jmse)
- {
- _logger.trace("JMSE on rollback:" + jmse.getMessage(),
jmse);
-
- // Both commit and rollback failed. Throw the rollback
exception.
- throw jmse;
- }
- }
- }
- }
-
- /**
- * Prompts the user to terminate the named broker, in order to test
failover functionality. This method will block
- * until the user supplied some input on the terminal.
- *
- * @param broker The name of the broker to terminate.
- */
- protected void doFailover(String broker)
- {
- System.out.println("Kill Broker " + broker + " now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
- }
-
- /**
- * Prompts the user to terminate the broker, in order to test failover
functionality. This method will block
- * until the user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
-
- }
-
- private void createConsumerDestination(String name)
- {
- if (isPubSub())
- {
- _consumerDestination = new
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
- }
- else
- {
- _consumerDestination = new
AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
- }
- }
-
- /**
- * A connection listener that logs out any failover complete events. Could
do more interesting things with this
- * at some point...
- */
- public static class FailoverNotifier implements ConnectionListener
- {
- public void bytesSent(long count)
- { }
-
- public void bytesReceived(long count)
- { }
-
- public boolean preFailover(boolean redirect)
- {
- return true;
- }
-
- public boolean preResubscribe()
- {
- return true;
- }
-
- public void failoverComplete()
- {
- _logger.info("App got failover complete callback.");
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their
reply to destination. This is used to return
+ * ping messages generated by [EMAIL PROTECTED]
org.apache.qpid.requestreply.PingPongProducer} but could be used for other
purposes
+ * too.
+ *
+ * <p/>The correlation id from the received message is extracted, and placed
into the reply as the correlation id. Messages
+ * are bounced back to their reply-to destination. The original sender of the
message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to
the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping
to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the
bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow
things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a
configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console
every X messages.
+ */
+public class PingPongBouncer implements MessageListener
+{
+ private static final Logger _logger =
Logger.getLogger(PingPongBouncer.class);
+
+ /** The default prefetch size for the message consumer. */
+ private static final int PREFETCH = 1;
+
+ /** The default no local flag for the message consumer. */
+ private static final boolean NO_LOCAL = true;
+
+ private static final String DEFAULT_DESTINATION_NAME = "ping";
+
+ /** The default exclusive flag for the message consumer. */
+ private static final boolean EXCLUSIVE = false;
+
+ /** A convenient formatter to use when time stamping output. */
+ protected static final SimpleDateFormat timestampFormatter = new
SimpleDateFormat("hh:mm:ss:SS");
+
+ /** Used to indicate that the reply generator should log timing info to
the console (logger info level). */
+ private boolean _verbose = false;
+
+ /** Determines whether this bounce back client bounces back messages
persistently. */
+ private boolean _persistent = false;
+
+ private Destination _consumerDestination;
+
+ /** Keeps track of the response destination of the previous message for
the last reply to producer cache. */
+ private Destination _lastResponseDest;
+
+ /** The producer for sending replies with. */
+ private MessageProducer _replyProducer;
+
+ /** The consumer controlSession. */
+ private Session _consumerSession;
+
+ /** The producer controlSession. */
+ private Session _producerSession;
+
+ /** Holds the connection to the broker. */
+ private AMQConnection _connection;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping
client. */
+ private boolean _isPubSub = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to kill
a broker, in order to test
+ * failover, immediately before committing a transaction.
+ */
+ protected boolean _failBeforeCommit = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to a
kill a broker, in order to test
+ * failover, immediate after committing a transaction.
+ */
+ protected boolean _failAfterCommit = false;
+
+ /**
+ * Creates a PingPongBouncer on the specified producer and consumer
sessions.
+ *
+ * @param brokerDetails The addresses of the brokers to connect to.
+ * @param username The broker username.
+ * @param password The broker password.
+ * @param virtualpath The virtual host name within the broker.
+ * @param destinationName The name of the queue to receive pings on
+ * (or root of the queue name where many queues are
generated).
+ * @param persistent A flag to indicate that persistent message
should be used.
+ * @param transacted A flag to indicate that pings should be sent
within transactions.
+ * @param selector A message selector to filter received pings with.
+ * @param verbose A flag to indicate that message timings should
be sent to the console.
+ *
+ * @throws Exception All underlying exceptions allowed to fall through.
This is only test code...
+ */
+ public PingPongBouncer(String brokerDetails, String username, String
password, String virtualpath,
+ String destinationName, boolean persistent, boolean
transacted, String selector, boolean verbose,
+ boolean pubsub) throws Exception
+ {
+ // Create a client id to uniquely identify this client.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientId = address.getHostName() + System.currentTimeMillis();
+ _verbose = verbose;
+ _persistent = persistent;
+ setPubSub(pubsub);
+ // Connect to the broker.
+ setConnection(new AMQConnection(brokerDetails, username, password,
clientId, virtualpath));
+ _logger.info("Connected with URL:" + getConnection().toURL());
+
+ // Set up the failover notifier.
+ getConnection().setConnectionListener(new FailoverNotifier());
+
+ // Create a controlSession to listen for messages on and one to send
replies on, transactional depending on the
+ // command line option.
+ _consumerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create the queue to listen for message on.
+ createConsumerDestination(destinationName);
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(_consumerDestination, PREFETCH,
NO_LOCAL, EXCLUSIVE, selector);
+
+ // Create a producer for the replies, without a default destination.
+ _replyProducer = _producerSession.createProducer(null);
+ _replyProducer.setDisableMessageTimestamp(true);
+ _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ System.out.println("Starting...");
+
+ // Display help on the command line.
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ // Extract all command line parameters.
+ Config config = new Config();
+ config.setOptions(args);
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "test";
+ String destinationName = config.getDestination();
+ if (destinationName == null)
+ {
+ destinationName = DEFAULT_DESTINATION_NAME;
+ }
+
+ String selector = config.getSelector();
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ boolean pubsub = config.isPubSub();
+ boolean verbose = true;
+
+ //String selector = null;
+
+ // Instantiate the ping pong client with the command line options and
start it running.
+ PingPongBouncer pingBouncer =
+ new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath,
destinationName, persistent, transacted,
+ selector, verbose, pubsub);
+ pingBouncer.getConnection().start();
+
+ System.out.println("Waiting...");
+ }
+
+ private static void usage()
+ {
+ System.err.println("Usage: PingPongBouncer \n" + "-host : broker
host\n" + "-port : broker port\n"
+ + "-destinationname : queue/topic name\n" +
"-transacted : (true/false). Default is false\n"
+ + "-persistent : (true/false). Default is false\n"
+ + "-pubsub : (true/false). Default is false\n"
+ "-selector : selector string\n");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which
this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages
it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String messageCorrelationId = message.getJMSCorrelationID();
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got
ping with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Get the reply to destination from the message and check it is
set.
+ Destination responseDest = message.getJMSReplyTo();
+
+ if (responseDest == null)
+ {
+ _logger.debug("Cannot send reply because reply-to destination
is null.");
+
+ return;
+ }
+
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Time to bounce point: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Send the receieved message as the pong reply.
+ _replyProducer.send(responseDest, message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Sent
reply with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Commit the transaction if running in transactional mode.
+ commitTx(_producerSession);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Sets the connection that this ping client is using.
+ *
+ * @param connection The ping connection.
+ */
+ public void setConnection(AMQConnection connection)
+ {
+ this._connection = connection;
+ }
+
+ /**
+ * Sets or clears the pub/sub flag to indiciate whether this client is
pinging a queue or a topic.
+ *
+ * @param pubsub <tt>true</tt> if this client is pinging a topic,
<tt>false</tt> if it is pinging a queue.
+ */
+ public void setPubSub(boolean pubsub)
+ {
+ _isPubSub = pubsub;
+ }
+
+ /**
+ * Checks whether this client is a p2p or pub/sub ping client.
+ *
+ * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt>
if it is pinging a queue.
+ */
+ public boolean isPubSub()
+ {
+ return _isPubSub;
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified
controlSession. If the controlSession to commit on is not
+ * a transactional controlSession, this method does nothing.
+ *
+ * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will
prompt the user to kill the broker before the
+ * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is
set, this will prompt the user to kill the broker
+ * after the commit is applied.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the
rollback fails.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ if (session.getTransacted())
+ {
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ _logger.debug("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ _logger.debug("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.debug("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ try
+ {
+ session.rollback();
+ _logger.debug("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(),
jmse);
+
+ // Both commit and rollback failed. Throw the rollback
exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+
+ /**
+ * Prompts the user to terminate the named broker, in order to test
failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ *
+ * @param broker The name of the broker to terminate.
+ */
+ protected void doFailover(String broker)
+ {
+ System.out.println("Kill Broker " + broker + " now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Prompts the user to terminate the broker, in order to test failover
functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+
+ }
+
+ private void createConsumerDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _consumerDestination = new
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
+ }
+ else
+ {
+ _consumerDestination = new
AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
+ }
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could
do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
+ }
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
------------------------------------------------------------------------------
svn:eol-style = native