Author: rupertlssmith
Date: Wed Oct 3 09:32:09 2007
New Revision: 581648
URL: http://svn.apache.org/viewvc?rev=581648&view=rev
Log:
Merged revisions 581647 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r581647 | rupertlssmith | 2007-10-03 17:28:38 +0100 (Wed, 03 Oct 2007) | 1
line
Performance enhancements for the tests, producers stalled individually above
maxPending size.
........
Modified:
incubator/qpid/branches/M2/ (props changed)
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Oct 3 09:32:09 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-581628
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-581628,581647
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=581648&r1=581647&r2=581648&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
Wed Oct 3 09:32:09 2007
@@ -29,11 +29,9 @@
import uk.co.thebadgerset.junit.extensions.TimingController;
import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.ObjectMessage;
import java.util.Collections;
import java.util.HashMap;
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=581648&r1=581647&r2=581648&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Wed Oct 3 09:32:09 2007
@@ -423,14 +423,20 @@
* Holds a monitor which is used to synchronize sender and receivers
threads, where the sender has elected
* to wait until the number of unreceived message is reduced before
continuing to send.
*/
- protected static final Object _sendPauseMonitor = new Object();
+ protected final Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not
received. */
- protected static AtomicInteger _unreceived = new AtomicInteger(0);
+ protected AtomicInteger _unreceived = new AtomicInteger(0);
/** A source for providing sequential unique correlation ids. These will
be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
+ /** A source for providing sequential unqiue ids for instances of this
class to be identifed with. */
+ private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
+
+ /** Holds this instances unique id. */
+ private int instanceId;
+
/**
* Holds a map from message ids to latches on which threads wait for
replies. This map is shared accross multiple
* ping producers on the same JVM.
@@ -507,6 +513,7 @@
public PingPongProducer(Properties overrides) throws Exception
{
// log.debug("public PingPongProducer(Properties overrides = " +
overrides + "): called");
+ instanceId = _instanceIdGenerator.getAndIncrement();
// Create a set of parsed properties from the defaults overriden by
the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -814,9 +821,9 @@
/*log.debug("public void createReplyConsumers(Collection<Destination>
destinations = " + destinations
+ ", String selector = " + selector + "): called");*/
- // log.debug("There are " + destinations.size() + " destinations.");
- // log.debug("Creating " + _noOfConsumers + " consumers on each
destination.");
- // log.debug("Total number of consumers is: " + (destinations.size() *
_noOfConsumers));
+ log.debug("There are " + destinations.size() + " destinations.");
+ log.debug("Creating " + _noOfConsumers + " consumers on each
destination.");
+ log.debug("Total number of consumers is: " + (destinations.size() *
_noOfConsumers));
for (Destination destination : destinations)
{
@@ -839,7 +846,7 @@
}
});
- // log.debug("Set consumer " + i + " to listen to replies sent
to destination: " + destination);
+ log.debug("Set consumer " + i + " to listen to replies sent to
destination: " + destination);
}
}
}
@@ -861,7 +868,7 @@
long timestamp = getTimestamp(message);
long pingTime = now - timestamp;
- NDC.push("cons" + consumerNo);
+ // NDC.push("id" + instanceId + "/cons" + consumerNo);
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
@@ -887,38 +894,41 @@
// log.debug("Reply was expected, decrementing the latch
for the id, " + correlationID);
- // Decrement the countdown latch. Before this point, it is
possible that two threads might enter this
- // method simultanesouly with the same correlation id.
Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for
the remaining messages.
- long trueCount;
- long remainingCount;
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
- synchronized (trafficLight)
+ // Release waiting senders if there are some and using
maxPending limit.
+ if ((_maxPendingSize > 0))
{
- trafficLight.countDown();
-
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
-
// Decrement the count of sent but not yet received
messages.
int unreceived = _unreceived.decrementAndGet();
int unreceivedSize =
(unreceived * ((_messageSize == 0) ? 1 :
_messageSize))
/ (_isPubSub ? getConsumersPerDestination() : 1);
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
-
- // Release a waiting sender if there is one.
synchronized (_sendPauseMonitor)
{
- if ((_maxPendingSize > 0) && (unreceivedSize <
_maxPendingSize))
+ if (unreceivedSize < _maxPendingSize)
{
_sendPauseMonitor.notify();
}
}
+ }
+
+ // Decrement the countdown latch. Before this point, it is
possible that two threads might enter this
+ // method simultanesouly with the same correlation id.
Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for
the remaining messages.
+ long trueCount;
+ long remainingCount;
- NDC.push("/rem" + remainingCount);
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
+
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
+
+ // NDC.push("/rem" + remainingCount);
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
@@ -1069,7 +1079,7 @@
// commitTx(_consumerSession);
- // //log.debug("public int pingAndWaitForReply(Message message,
int numPings, long timeout): ending");
+ // log.debug("public int pingAndWaitForReply(Message message, int
numPings, long timeout): ending");
return numReplies;
}
@@ -1146,109 +1156,131 @@
*/
protected boolean sendMessage(int i, Message message) throws JMSException
{
- // log.debug("protected boolean sendMessage(int i = " + i + ", Message
message): called");
- // log.debug("_txBatchSize = " + _txBatchSize);
-
- // Round robin the destinations as the messages are sent.
- Destination destination = _pingDestinations.get(i %
_pingDestinations.size());
-
- // Prompt the user to kill the broker when doing failover testing.
- _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
-
- // If necessary, wait until the max pending message size comes within
its limit.
- synchronized (_sendPauseMonitor)
+ try
{
- // Used to keep track of the number of times that send has to wait.
- int numWaits = 0;
+ NDC.push("id" + instanceId + "/prod");
- // The maximum number of waits before the test gives up and fails.
This has been chosen to correspond with
- // the test timeout.
- int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+ // log.debug("protected boolean sendMessage(int i = " + i + ",
Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
- while ((_maxPendingSize > 0))
- {
- // Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
- int unreceivedSize =
- (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) /
(_isPubSub ? getConsumersPerDestination() : 1);
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i %
_pingDestinations.size());
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
- // log.debug("_maxPendingSize = " + _maxPendingSize);
+ // Prompt the user to kill the broker when doing failover testing.
+ _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
- if (unreceivedSize > _maxPendingSize)
+ // If necessary, wait until the max pending message size comes
within its limit.
+ if (_maxPendingSize > 0)
+ {
+ synchronized (_sendPauseMonitor)
{
- // log.debug("unreceived size estimate over limit = " +
unreceivedSize);
+ // Used to keep track of the number of times that send has
to wait.
+ int numWaits = 0;
- // Wait on the send pause barrier for the limit to be
re-established.
- try
- {
- _sendPauseMonitor.wait(10000);
- numWaits++;
- }
- catch (InterruptedException e)
- {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ // The maximum number of waits before the test gives up
and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
- // Fail the test if the send has had to wait more than the
maximum allowed number of times.
- if (numWaits >= waitLimit)
+ while (true)
{
- String errorMessage =
- "Send has had to wait for the unreceivedSize (" +
unreceivedSize
- + ") to come below the maxPendingSize (" +
_maxPendingSize + ") more that " + waitLimit
- + " times.";
- log.warn(errorMessage);
- throw new RuntimeException(errorMessage);
+ // Get the size estimate of sent but not yet received
messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 :
_messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ // log.debug("unreceived size estimate over limit
= " + unreceivedSize);
+
+ // Fail the test if the send has had to wait more
than the maximum allowed number of times.
+ if (numWaits > waitLimit)
+ {
+ String errorMessage =
+ "Send has had to wait for the
unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" +
_maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ // Wait on the send pause barrier for the limit to
be re-established.
+ try
+ {
+ long start = System.nanoTime();
+ _sendPauseMonitor.wait(10000);
+ long end = System.nanoTime();
+
+ // Count the wait only if it was for > 99% of
the requested wait time.
+ if (((float) (end - start) / (float) (10000 *
1000000L)) > 0.99)
+ {
+ numWaits++;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ break;
+ }
}
}
- else
- {
- break;
- }
}
- }
- // Send the message either to its round robin destination, or its
default destination.
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
+ // Send the message either to its round robin destination, or its
default destination.
+ // int num = numSent.incrementAndGet();
+ // message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
- // Increase the unreceived size, this may actually happen after the
message is received.
- // The unreceived size is incremented by the number of consumers that
will get a copy of the message,
- // in pub/sub mode.
- // _unreceived.getAndIncrement();
- /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ?
getConsumersPerDestination() : 1);
- // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ // Increase the unreceived size, this may actually happen after
the message is received.
+ // The unreceived size is incremented by the number of consumers
that will get a copy of the message,
+ // in pub/sub mode.
+ if (_maxPendingSize > 0)
+ {
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ?
getConsumersPerDestination() : 1);
+ // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ }
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
- // Call commit every time the commit batch size is reached.
- boolean committed = false;
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is
the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit on producer session.");
+ committed = commitTx(_producerSession);
+ }
- // Commit on every transaction batch size boundary. Here i + 1 is the
count of actual messages sent.
- if (((i + 1) % _txBatchSize) == 0)
+ return committed;
+ }
+ finally
{
- // log.debug("Trying commit on producer session.");
- committed = commitTx(_producerSession);
+ NDC.clear();
}
-
- return committed;
}
/**
@@ -1269,7 +1301,7 @@
failFlag = false;
}
- // log.trace("Failing Before Send");
+ // log.debug("Failing Before Send");
waitForUser(KILL_BROKER_PROMPT);
}
@@ -1524,7 +1556,7 @@
{
_failBeforeCommit =
waitForUserToPromptOnFailure(_failBeforeCommit);
- // long start = System.nanoTime();
+ long start = System.nanoTime();
session.commit();
committed = true;
// log.debug("Time taken to commit :" + ((System.nanoTime() -
start) / 1000000f) + " ms");