Modified: incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=568924&r1=568923&r2=568924&view=diff ============================================================================== --- incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original) +++ incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Aug 23 03:37:59 2007 @@ -20,15 +20,6 @@ */ package org.apache.qpid.ping; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - import junit.framework.Test; import junit.framework.TestSuite; @@ -40,6 +31,15 @@ import uk.co.thebadgerset.junit.extensions.TimingControllerAware; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + /** * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller * interface supplied by the test runner from a seperate listener thread. It differs from the [EMAIL PROTECTED] PingTestPerf} test @@ -239,7 +239,7 @@ * * @throws JMSException Any underlying JMSException is allowed to fall through. */ - public void onMessage(Message message, int remainingCount) throws JMSException + public void onMessage(Message message, int remainingCount, long latency) throws JMSException { // Check if a batch boundary has been crossed. if ((remainingCount % _batchSize) == 0)
Modified: incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?rev=568924&r1=568923&r2=568924&view=diff ============================================================================== --- incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original) +++ incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Thu Aug 23 03:37:59 2007 @@ -95,13 +95,13 @@ { log.debug("1 consumer per destination."); - return 1; + return _noOfConsumers; } else { log.debug(_pingClientCount + " consumers per destination."); - return _pingClientCount; + return _pingClientCount * _noOfConsumers; } } } Modified: incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=568924&r1=568923&r2=568924&view=diff ============================================================================== --- incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original) +++ incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Thu Aug 23 03:37:59 2007 @@ -360,7 +360,7 @@ } // Ensure messages received are committed. - if (_transacted) + if (_consTransacted) { try { Modified: incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java?rev=568924&r1=568923&r2=568924&view=diff ============================================================================== --- incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java (original) +++ incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java Thu Aug 23 03:37:59 2007 @@ -20,15 +20,6 @@ */ package org.apache.qpid.ping; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - import junit.framework.Test; import junit.framework.TestSuite; @@ -43,6 +34,15 @@ import uk.co.thebadgerset.junit.extensions.TimingControllerAware; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + /** * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for @@ -261,7 +261,7 @@ * * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. */ - public void onMessage(Message message, int remainingCount) throws JMSException + public void onMessage(Message message, int remainingCount, long latency) throws JMSException { _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); @@ -280,26 +280,6 @@ TimingController tc = perCorrelationId._tc; int expected = perCorrelationId._expectedCount; - // Extract the send time from the message and work out from the current time, what the ping latency was. - // The ping producer time stamps messages in nanoseconds. - long startTime; - - if (_strictAMQP) - { - Long value = - ((AMQMessage) message).getTimestampProperty(new AMQShortString( - PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME)); - - startTime = ((value == null) ? 0L : value); - } - else - { - startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - } - - long now = System.nanoTime(); - long pingTime = now - startTime; - // Calculate how many messages were actually received in the last batch. This will be the batch size // except where the number expected is not a multiple of the batch size and this is the first remaining // count to cross a batch size boundary, in which case it will be the number expected modulo the batch @@ -309,8 +289,7 @@ // Register a test result for the correlation id. try { - - tc.completeTest(true, receivedInBatch, pingTime); + tc.completeTest(true, receivedInBatch, latency); } catch (InterruptedException e) { Modified: incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=568924&r1=568923&r2=568924&view=diff ============================================================================== --- incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original) +++ incubator/qpid/branches/M2.0.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Aug 23 03:37:59 2007 @@ -21,6 +21,7 @@ package org.apache.qpid.requestreply; import org.apache.log4j.Logger; +import org.apache.log4j.NDC; import org.apache.qpid.AMQException; import org.apache.qpid.client.*; @@ -97,6 +98,10 @@ * 3 - DUPS_OK_ACKNOWLEDGE * 257 - NO_ACKNOWLEDGE * 258 - PRE_ACKNOWLEDGE + * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value + * as the 'transacted' option if not seperately defined. + * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same + * value as 'ackMode' if not seperately defined. * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received. * Limits the volume of messages currently buffered on the client * or broker. Can help scale test clients by limiting amount of buffered @@ -132,6 +137,7 @@ */ public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener { + /** Used for debugging. */ private static final Logger log = Logger.getLogger(PingPongProducer.class); /** Holds the name of the property to get the test message size from. */ @@ -158,6 +164,9 @@ /** Holds the transactional mode to use for the test. */ public static final boolean TRANSACTED_DEFAULT = false; + public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; + /** Holds the name of the property to get the test broker url from. */ public static final String BROKER_PROPNAME = "broker"; @@ -275,6 +284,9 @@ /** Defines the default message acknowledgement mode. */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + public static final String MAX_PENDING_PROPNAME = "maxPending"; public static final int MAX_PENDING_DEFAULT = 0; @@ -302,8 +314,10 @@ defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); @@ -329,12 +343,15 @@ protected String _destinationName; protected String _selector; protected boolean _transacted; + protected boolean _consTransacted; /** Determines whether this producer sends persistent messages. */ protected boolean _persistent; /** Holds the acknowledgement mode used for sending and receiving messages. */ - private int _ackMode; + protected int _ackMode; + + protected int _consAckMode; /** Determines what size of messages this producer sends. */ protected int _messageSize; @@ -451,16 +468,13 @@ /** Holds the message consumer to receive the ping replies through. */ protected MessageConsumer[] _consumer; - /** - * Holds the number of consumers that will be attached to each destination in the test. Each pings will result in - * a message being received by each of these clients in a pub/sub tests, and by only one at a time in a p2p test. - */ - static int _consumersPerDestination = 1; - /** The prompt to display when asking the user to kill the broker for failover testing. */ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; private String _clientID; + /** Keeps count of the total messages sent purely for debugging purposes. */ + private static AtomicInteger numSent = new AtomicInteger(); + /** * Creates a ping producer with the specified parameters, of which there are many. See the class level comments * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on @@ -472,7 +486,7 @@ */ public PingPongProducer(Properties overrides) throws Exception { - log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); // Create a set of parsed properties from the defaults overriden by the passed in values. ParsedProperties properties = new ParsedProperties(defaults); @@ -486,6 +500,7 @@ _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); _selector = properties.getProperty(SELECTOR_PROPNAME); _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); @@ -502,6 +517,7 @@ _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); // Check that one or more destinations were specified. @@ -532,7 +548,7 @@ */ public void establishConnection(boolean producer, boolean consumer) throws Exception { - log.debug("public void establishConnection(): called"); + // log.debug("public void establishConnection(): called"); // Generate a unique identifying name for this client, based on it ip address and the current time. InetAddress address = InetAddress.getLocalHost(); @@ -548,7 +564,7 @@ for (int i = 0; i < _noOfConsumers; i++) { - _consumerSession[i] = (Session) _consumerConnection[i].createSession(_transacted, _ackMode); + _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode); } // Create the destinations to send pings to and receive replies from. @@ -579,8 +595,14 @@ */ protected void createConnection(String clientID) throws AMQException, URLSyntaxException { + // log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + + // log.debug("Creating a connection for the message producer."); + _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + // log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + _consumerConnection = new Connection[_noOfConsumers]; for (int i = 0; i < _noOfConsumers; i++) @@ -654,12 +676,12 @@ */ public List<Destination> getReplyDestinations() { - log.debug("public List<Destination> getReplyDestinations(): called"); + // log.debug("public List<Destination> getReplyDestinations(): called"); List<Destination> replyDestinations = new ArrayList<Destination>(); replyDestinations.add(_replyDestination); - log.debug("replyDestinations = " + replyDestinations); + // log.debug("replyDestinations = " + replyDestinations); return replyDestinations; } @@ -672,12 +694,12 @@ */ public void createProducer() throws JMSException { - log.debug("public void createProducer(): called"); + // log.debug("public void createProducer(): called"); _producer = (MessageProducer) _producerSession.createProducer(null); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); } /** @@ -695,14 +717,14 @@ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, boolean durable) throws JMSException, AMQException { - log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called"); + + durable + "): called");*/ _pingDestinations = new ArrayList<Destination>(); // Create the desired number of ping destinations and consumers for them. - log.debug("Creating " + noOfDestinations + " destinations to ping."); + // log.debug("Creating " + noOfDestinations + " destinations to ping."); for (int i = 0; i < noOfDestinations; i++) { @@ -713,12 +735,12 @@ // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. if (unique) { - log.debug("Creating unique destinations."); + // log.debug("Creating unique destinations."); id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); } else { - log.debug("Creating shared destinations."); + // log.debug("Creating shared destinations."); id = "_" + _queueSharedID.incrementAndGet(); } @@ -728,14 +750,14 @@ if (!durable) { destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - log.debug("Created non-durable topic " + destination); + // log.debug("Created non-durable topic " + destination); } else { destination = AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), _clientID, (AMQConnection) _connection); - log.debug("Created durable topic " + destination); + // log.debug("Created durable topic " + destination); } } // Otherwise this is a p2p pinger, in which case create queues. @@ -749,7 +771,7 @@ ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, ExchangeDefaults.DIRECT_EXCHANGE_NAME); - log.debug("Created queue " + destination); + // log.debug("Created queue " + destination); } // Keep the destination. @@ -767,10 +789,12 @@ */ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException { - log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations - + ", String selector = " + selector + "): called"); + /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations + + ", String selector = " + selector + "): called");*/ - log.debug("Creating " + destinations.size() + " reply consumers."); + // log.debug("There are " + destinations.size() + " destinations."); + // log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); for (Destination destination : destinations) { @@ -793,7 +817,7 @@ } }); - log.debug("Set this to listen to replies sent to destination: " + destination); + // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); } } } @@ -807,95 +831,121 @@ */ public void onMessageWithConsumerNo(Message message, int consumerNo) { - // log.debug("public void onMessage(Message message): called"); - + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); try { + long now = System.nanoTime(); + long timestamp = getTimestamp(message); + long pingTime = now - timestamp; + + // NDC.push("cons" + consumerNo); + // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); // log.debug("correlationID = " + correlationID); - // Countdown on the traffic light if there is one for the matching correlation id. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + int num = message.getIntProperty("MSG_NUM"); + // log.info("Message " + num + " received."); + + boolean isRedelivered = message.getJMSRedelivered(); + // log.debug("isRedelivered = " + isRedelivered); - if (perCorrelationId != null) + if (!isRedelivered) { - CountDownLatch trafficLight = perCorrelationId.trafficLight; + // Countdown on the traffic light if there is one for the matching correlation id. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + if (perCorrelationId != null) + { + CountDownLatch trafficLight = perCorrelationId.trafficLight; - // Restart the timeout timer on every message. - perCorrelationId.timeOutStart = System.nanoTime(); + // Restart the timeout timer on every message. + perCorrelationId.timeOutStart = System.nanoTime(); - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount = -1; - long remainingCount = -1; + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount = -1; + long remainingCount = -1; - synchronized (trafficLight) - { - trafficLight.countDown(); + synchronized (trafficLight) + { + trafficLight.countDown(); - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; - // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); - // Release a waiting sender if there is one. - synchronized (_sendPauseMonitor) - { - if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) - // && (_sendPauseBarrier.getNumberWaiting() == 1)) + // Release a waiting sender if there is one. + synchronized (_sendPauseMonitor) { - log.debug("unreceived size estimate under limit = " + unreceivedSize); - - // Wait on the send pause barrier for the limit to be re-established. - /*try - {*/ - // _sendPauseBarrier.await(); - _sendPauseMonitor.notify(); - /*} - catch (InterruptedException e) + if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) + // && (_sendPauseBarrier.getNumberWaiting() == 1)) { - throw new RuntimeException(e); + // log.debug("unreceived size estimate under limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + /*try + {*/ + // _sendPauseBarrier.await(); + _sendPauseMonitor.notify(); + /*} + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + }*/ } - catch (BrokenBarrierException e) - { - throw new RuntimeException(e); - }*/ } - } - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); + // NDC.push("/rem" + remainingCount); - // Commit on transaction batch size boundaries. At this point in time the waiting producer remains - // blocked, even on the last message. - if ((remainingCount % _txBatchSize) == 0) - { - commitTx(_consumerSession[consumerNo]); - } + // log.debug("remainingCount = " + remainingCount); + // log.debug("trueCount = " + trueCount); - // Forward the message and remaining count to any interested chained message listener. - if (_chainedMessageListener != null) - { - _chainedMessageListener.onMessage(message, (int) remainingCount); - } + // Commit on transaction batch size boundaries. At this point in time the waiting producer remains + // blocked, even on the last message. + // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on + // each batch boundary. For pub/sub each consumer gets every message so no division is done. + long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // log.debug("commitCount = " + commitCount); - // Check if this is the last message, in which case release any waiting producers. This is done - // after the transaction has been committed and any listeners notified. - if (trueCount == 1) - { - trafficLight.countDown(); + if ((commitCount % _txBatchSize) == 0) + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + } + + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } } } + else + { + log.warn("Got unexpected message with correlationId: " + correlationID); + } } else { - log.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Got redelivered message, ignoring."); } // Print out ping times for every message in verbose mode only. @@ -914,8 +964,11 @@ { log.warn("There was a JMSException: " + e.getMessage(), e); } - - // log.debug("public void onMessage(Message message): ending"); + finally + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // NDC.clear(); + } } /** @@ -937,8 +990,8 @@ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { - log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ // Generate a unique correlation id to put on the messages before sending them, if one was not specified. if (messageCorrelationId == null) @@ -948,6 +1001,8 @@ try { + // NDC.push("prod"); + // Create a count down latch to count the number of replies with. This is created before the messages are // sent so that the replies cannot be received before the count down is created. // One is added to this, so that the last reply becomes a special case. The special case is that the @@ -979,16 +1034,16 @@ allMessagesReceived = numReplies == getExpectedNumPings(numPings); - log.debug("numReplies = " + numReplies); - log.debug("allMessagesReceived = " + allMessagesReceived); + // log.debug("numReplies = " + numReplies); + // log.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); long lastMessageReceievedAt = perCorrelationId.timeOutStart; timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - log.debug("now = " + now); - log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + // log.debug("now = " + now); + // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); } while (!timedOut && !allMessagesReceived); @@ -1003,7 +1058,7 @@ // commitTx(_consumerSession); - log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } @@ -1011,6 +1066,7 @@ // so will be a memory leak if this is not done. finally { + // NDC.pop(); perCorrelationIds.remove(messageCorrelationId); } } @@ -1026,8 +1082,8 @@ */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { - log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ if (message == null) { @@ -1111,7 +1167,7 @@ if (unreceivedSize > _maxPendingSize) { - log.debug("unreceived size estimate over limit = " + unreceivedSize); + // log.debug("unreceived size estimate over limit = " + unreceivedSize); // Wait on the send pause barrier for the limit to be re-established. try @@ -1140,11 +1196,19 @@ // Send the message either to its round robin destination, or its default destination. if (destination == null) { + int num = numSent.incrementAndGet(); + message.setIntProperty("MSG_NUM", num); + setTimestamp(message); _producer.send(message); + // log.info("Message " + num + " sent."); } else { + int num = numSent.incrementAndGet(); + message.setIntProperty("MSG_NUM", num); + setTimestamp(message); _producer.send(destination, message); + // log.info("Message " + num + " sent."); } // Increase the unreceived size, this may actually happen aftern the message is recevied. @@ -1162,6 +1226,7 @@ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. if (((i + 1) % _txBatchSize) == 0) { + // log.debug("Trying commit on producer session."); committed = commitTx(_producerSession); } @@ -1179,7 +1244,7 @@ { // Generate a sample message and time stamp it. Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - setTimestamp(msg); + // setTimestamp(msg); // Send the message and wait for a reply. pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); @@ -1187,12 +1252,12 @@ catch (JMSException e) { _publish = false; - log.debug("There was a JMSException: " + e.getMessage(), e); + // log.debug("There was a JMSException: " + e.getMessage(), e); } catch (InterruptedException e) { _publish = false; - log.debug("There was an interruption: " + e.getMessage(), e); + // log.debug("There was an interruption: " + e.getMessage(), e); } } @@ -1230,7 +1295,7 @@ // Timestamp the message in nanoseconds. - setTimestamp(msg); + // setTimestamp(msg); return msg; } @@ -1299,7 +1364,7 @@ */ public void onException(JMSException e) { - log.debug("public void onException(JMSException e = " + e + "): called", e); + // log.debug("public void onException(JMSException e = " + e + "): called", e); _publish = false; } @@ -1327,14 +1392,14 @@ */ public void close() throws JMSException { - log.debug("public void close(): called"); + // log.debug("public void close(): called"); try { if (_connection != null) { _connection.close(); - log.debug("Close connection."); + // log.debug("Close connection."); } for (int i = 0; i < _noOfConsumers; i++) @@ -1342,7 +1407,7 @@ if (_consumerConnection[i] != null) { _consumerConnection[i].close(); - log.debug("Closed consumer connection."); + // log.debug("Closed consumer connection."); } } } @@ -1382,7 +1447,7 @@ */ protected boolean commitTx(Session session) throws JMSException { - // log.debug("protected void commitTx(Session controlSession): called"); + // log.debug("protected void commitTx(Session session): called"); boolean committed = false; @@ -1401,6 +1466,8 @@ if (session.getTransacted()) { + // log.debug("Session is transacted."); + try { if (_failBeforeCommit) @@ -1414,10 +1481,10 @@ waitForUser(KILL_BROKER_PROMPT); } - // long l = System.nanoTime(); + long start = System.nanoTime(); session.commit(); committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms"); + // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); if (_failAfterCommit) { @@ -1430,26 +1497,26 @@ waitForUser(KILL_BROKER_PROMPT); } - // log.trace("Session Commited."); + // log.debug("Session Commited."); } catch (JMSException e) { - log.debug("JMSException on commit:" + e.getMessage(), e); + // log.debug("JMSException on commit:" + e.getMessage(), e); // Warn that the bounce back client is not available. if (e.getLinkedException() instanceof AMQNoConsumersException) { - log.debug("No consumers on queue."); + // log.debug("No consumers on queue."); } try { session.rollback(); - log.debug("Message rolled back."); + // log.debug("Message rolled back."); } catch (JMSException jmse) { - log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); // Both commit and rollback failed. Throw the rollback exception. throw jmse; @@ -1488,7 +1555,7 @@ */ public int getConsumersPerDestination() { - return _consumersPerDestination; + return _noOfConsumers; } /** @@ -1500,7 +1567,9 @@ */ public int getExpectedNumPings(int numpings) { - log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); + // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); + + // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); return numpings * (_isPubSub ? getConsumersPerDestination() : 1); } @@ -1517,7 +1586,17 @@ */ public static interface ChainedMessageListener { - public void onMessage(Message message, int remainingCount) throws JMSException; + /** + * Notifies interested listeners about message arrival and important test stats, the number of messages + * remaining in the test, and the messages send timestamp. + * + * @param message The newly arrived message. + * @param remainingCount The number of messages left to complete the test. + * @param latency The nanosecond latency of the message. + * + * @throws JMSException Any JMS exceptions is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException; } /**
