Author: rupertlssmith
Date: Wed Oct  3 09:28:38 2007
New Revision: 581647

URL: http://svn.apache.org/viewvc?rev=581647&view=rev
Log:
Performance enhancements for the tests, producers stalled individually above 
maxPending size.

Modified:
    
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=581647&r1=581646&r2=581647&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
 Wed Oct  3 09:28:38 2007
@@ -29,11 +29,9 @@
 
 import uk.co.thebadgerset.junit.extensions.TimingController;
 import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.ObjectMessage;
 
 import java.util.Collections;
 import java.util.HashMap;

Modified: 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=581647&r1=581646&r2=581647&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Wed Oct  3 09:28:38 2007
@@ -423,14 +423,20 @@
      * Holds a monitor which is used to synchronize sender and receivers 
threads, where the sender has elected
      * to wait until the number of unreceived message is reduced before 
continuing to send.
      */
-    protected static final Object _sendPauseMonitor = new Object();
+    protected final Object _sendPauseMonitor = new Object();
 
     /** Keeps a count of the number of message currently sent but not 
received. */
-    protected static AtomicInteger _unreceived = new AtomicInteger(0);
+    protected AtomicInteger _unreceived = new AtomicInteger(0);
 
     /** A source for providing sequential unique correlation ids. These will 
be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
+    /** A source for providing sequential unqiue ids for instances of this 
class to be identifed with. */
+    private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
+
+    /** Holds this instances unique id. */
+    private int instanceId;
+
     /**
      * Holds a map from message ids to latches on which threads wait for 
replies. This map is shared accross multiple
      * ping producers on the same JVM.
@@ -507,6 +513,7 @@
     public PingPongProducer(Properties overrides) throws Exception
     {
         // log.debug("public PingPongProducer(Properties overrides = " + 
overrides + "): called");
+        instanceId = _instanceIdGenerator.getAndIncrement();
 
         // Create a set of parsed properties from the defaults overriden by 
the passed in values.
         ParsedProperties properties = new ParsedProperties(defaults);
@@ -814,9 +821,9 @@
         /*log.debug("public void createReplyConsumers(Collection<Destination> 
destinations = " + destinations
             + ", String selector = " + selector + "): called");*/
 
-        // log.debug("There are " + destinations.size() + " destinations.");
-        // log.debug("Creating " + _noOfConsumers + " consumers on each 
destination.");
-        // log.debug("Total number of consumers is: " + (destinations.size() * 
_noOfConsumers));
+        log.debug("There are " + destinations.size() + " destinations.");
+        log.debug("Creating " + _noOfConsumers + " consumers on each 
destination.");
+        log.debug("Total number of consumers is: " + (destinations.size() * 
_noOfConsumers));
 
         for (Destination destination : destinations)
         {
@@ -839,7 +846,7 @@
                         }
                     });
 
-                // log.debug("Set consumer " + i + " to listen to replies sent 
to destination: " + destination);
+                log.debug("Set consumer " + i + " to listen to replies sent to 
destination: " + destination);
             }
         }
     }
@@ -861,7 +868,7 @@
             long timestamp = getTimestamp(message);
             long pingTime = now - timestamp;
 
-            NDC.push("cons" + consumerNo);
+            // NDC.push("id" + instanceId + "/cons" + consumerNo);
 
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
@@ -887,38 +894,41 @@
 
                     // log.debug("Reply was expected, decrementing the latch 
for the id, " + correlationID);
 
-                    // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
-                    // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
-                    // ensures that each thread will get a unique value for 
the remaining messages.
-                    long trueCount;
-                    long remainingCount;
+                    // log.debug("unreceived = " + unreceived);
+                    // log.debug("unreceivedSize = " + unreceivedSize);
 
-                    synchronized (trafficLight)
+                    // Release waiting senders if there are some and using 
maxPending limit.
+                    if ((_maxPendingSize > 0))
                     {
-                        trafficLight.countDown();
-
-                        trueCount = trafficLight.getCount();
-                        remainingCount = trueCount - 1;
-
                         // Decrement the count of sent but not yet received 
messages.
                         int unreceived = _unreceived.decrementAndGet();
                         int unreceivedSize =
                             (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
                             / (_isPubSub ? getConsumersPerDestination() : 1);
 
-                        // log.debug("unreceived = " + unreceived);
-                        // log.debug("unreceivedSize = " + unreceivedSize);
-
-                        // Release a waiting sender if there is one.
                         synchronized (_sendPauseMonitor)
                         {
-                            if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize))
+                            if (unreceivedSize < _maxPendingSize)
                             {
                                 _sendPauseMonitor.notify();
                             }
                         }
+                    }
+
+                    // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
+                    // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
+                    // ensures that each thread will get a unique value for 
the remaining messages.
+                    long trueCount;
+                    long remainingCount;
 
-                        NDC.push("/rem" + remainingCount);
+                    synchronized (trafficLight)
+                    {
+                        trafficLight.countDown();
+
+                        trueCount = trafficLight.getCount();
+                        remainingCount = trueCount - 1;
+
+                        // NDC.push("/rem" + remainingCount);
 
                         // log.debug("remainingCount = " + remainingCount);
                         // log.debug("trueCount = " + trueCount);
@@ -1069,7 +1079,7 @@
 
             // commitTx(_consumerSession);
 
-            // //log.debug("public int pingAndWaitForReply(Message message, 
int numPings, long timeout): ending");
+            // log.debug("public int pingAndWaitForReply(Message message, int 
numPings, long timeout): ending");
 
             return numReplies;
         }
@@ -1146,109 +1156,131 @@
      */
     protected boolean sendMessage(int i, Message message) throws JMSException
     {
-        // log.debug("protected boolean sendMessage(int i = " + i + ", Message 
message): called");
-        // log.debug("_txBatchSize = " + _txBatchSize);
-
-        // Round robin the destinations as the messages are sent.
-        Destination destination = _pingDestinations.get(i % 
_pingDestinations.size());
-
-        // Prompt the user to kill the broker when doing failover testing.
-        _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
-
-        // If necessary, wait until the max pending message size comes within 
its limit.
-        synchronized (_sendPauseMonitor)
+        try
         {
-            // Used to keep track of the number of times that send has to wait.
-            int numWaits = 0;
+            NDC.push("id" + instanceId + "/prod");
 
-            // The maximum number of waits before the test gives up and fails. 
This has been chosen to correspond with
-            // the test timeout.
-            int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+            // log.debug("protected boolean sendMessage(int i = " + i + ", 
Message message): called");
+            // log.debug("_txBatchSize = " + _txBatchSize);
 
-            while ((_maxPendingSize > 0))
-            {
-                // Get the size estimate of sent but not yet received messages.
-                int unreceived = _unreceived.get();
-                int unreceivedSize =
-                    (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / 
(_isPubSub ? getConsumersPerDestination() : 1);
+            // Round robin the destinations as the messages are sent.
+            Destination destination = _pingDestinations.get(i % 
_pingDestinations.size());
 
-                // log.debug("unreceived = " + unreceived);
-                // log.debug("unreceivedSize = " + unreceivedSize);
-                // log.debug("_maxPendingSize = " + _maxPendingSize);
+            // Prompt the user to kill the broker when doing failover testing.
+            _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
 
-                if (unreceivedSize > _maxPendingSize)
+            // If necessary, wait until the max pending message size comes 
within its limit.
+            if (_maxPendingSize > 0)
+            {
+                synchronized (_sendPauseMonitor)
                 {
-                    // log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
+                    // Used to keep track of the number of times that send has 
to wait.
+                    int numWaits = 0;
 
-                    // Wait on the send pause barrier for the limit to be 
re-established.
-                    try
-                    {
-                        _sendPauseMonitor.wait(10000);
-                        numWaits++;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        // Restore the interrupted status
-                        Thread.currentThread().interrupt();
-                        throw new RuntimeException(e);
-                    }
+                    // The maximum number of waits before the test gives up 
and fails. This has been chosen to correspond with
+                    // the test timeout.
+                    int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
 
-                    // Fail the test if the send has had to wait more than the 
maximum allowed number of times.
-                    if (numWaits >= waitLimit)
+                    while (true)
                     {
-                        String errorMessage =
-                            "Send has had to wait for the unreceivedSize (" + 
unreceivedSize
-                            + ") to come below the maxPendingSize (" + 
_maxPendingSize + ") more that " + waitLimit
-                            + " times.";
-                        log.warn(errorMessage);
-                        throw new RuntimeException(errorMessage);
+                        // Get the size estimate of sent but not yet received 
messages.
+                        int unreceived = _unreceived.get();
+                        int unreceivedSize =
+                            (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
+                            / (_isPubSub ? getConsumersPerDestination() : 1);
+
+                        // log.debug("unreceived = " + unreceived);
+                        // log.debug("unreceivedSize = " + unreceivedSize);
+                        // log.debug("_maxPendingSize = " + _maxPendingSize);
+
+                        if (unreceivedSize > _maxPendingSize)
+                        {
+                            // log.debug("unreceived size estimate over limit 
= " + unreceivedSize);
+
+                            // Fail the test if the send has had to wait more 
than the maximum allowed number of times.
+                            if (numWaits > waitLimit)
+                            {
+                                String errorMessage =
+                                    "Send has had to wait for the 
unreceivedSize (" + unreceivedSize
+                                    + ") to come below the maxPendingSize (" + 
_maxPendingSize + ") more that " + waitLimit
+                                    + " times.";
+                                log.warn(errorMessage);
+                                throw new RuntimeException(errorMessage);
+                            }
+
+                            // Wait on the send pause barrier for the limit to 
be re-established.
+                            try
+                            {
+                                long start = System.nanoTime();
+                                _sendPauseMonitor.wait(10000);
+                                long end = System.nanoTime();
+
+                                // Count the wait only if it was for > 99% of 
the requested wait time.
+                                if (((float) (end - start) / (float) (10000 * 
1000000L)) > 0.99)
+                                {
+                                    numWaits++;
+                                }
+                            }
+                            catch (InterruptedException e)
+                            {
+                                // Restore the interrupted status
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        else
+                        {
+                            break;
+                        }
                     }
                 }
-                else
-                {
-                    break;
-                }
             }
-        }
 
-        // Send the message either to its round robin destination, or its 
default destination.
-        int num = numSent.incrementAndGet();
-        message.setIntProperty("MSG_NUM", num);
-        setTimestamp(message);
+            // Send the message either to its round robin destination, or its 
default destination.
+            // int num = numSent.incrementAndGet();
+            // message.setIntProperty("MSG_NUM", num);
+            setTimestamp(message);
 
-        if (destination == null)
-        {
-            _producer.send(message);
-        }
-        else
-        {
-            _producer.send(destination, message);
-        }
+            if (destination == null)
+            {
+                _producer.send(message);
+            }
+            else
+            {
+                _producer.send(destination, message);
+            }
 
-        // Increase the unreceived size, this may actually happen after the 
message is received.
-        // The unreceived size is incremented by the number of consumers that 
will get a copy of the message,
-        // in pub/sub mode.
-        // _unreceived.getAndIncrement();
-        /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
-        // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+            // Increase the unreceived size, this may actually happen after 
the message is received.
+            // The unreceived size is incremented by the number of consumers 
that will get a copy of the message,
+            // in pub/sub mode.
+            if (_maxPendingSize > 0)
+            {
+                int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
+                // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+            }
 
-        // Apply message rate throttling if a rate limit has been set up.
-        if (_rateLimiter != null)
-        {
-            _rateLimiter.throttle();
-        }
+            // Apply message rate throttling if a rate limit has been set up.
+            if (_rateLimiter != null)
+            {
+                _rateLimiter.throttle();
+            }
 
-        // Call commit every time the commit batch size is reached.
-        boolean committed = false;
+            // Call commit every time the commit batch size is reached.
+            boolean committed = false;
+
+            // Commit on every transaction batch size boundary. Here i + 1 is 
the count of actual messages sent.
+            if (((i + 1) % _txBatchSize) == 0)
+            {
+                // log.debug("Trying commit on producer session.");
+                committed = commitTx(_producerSession);
+            }
 
-        // Commit on every transaction batch size boundary. Here i + 1 is the 
count of actual messages sent.
-        if (((i + 1) % _txBatchSize) == 0)
+            return committed;
+        }
+        finally
         {
-            // log.debug("Trying commit on producer session.");
-            committed = commitTx(_producerSession);
+            NDC.clear();
         }
-
-        return committed;
     }
 
     /**
@@ -1269,7 +1301,7 @@
                 failFlag = false;
             }
 
-            // log.trace("Failing Before Send");
+            // log.debug("Failing Before Send");
             waitForUser(KILL_BROKER_PROMPT);
         }
 
@@ -1524,7 +1556,7 @@
             {
                 _failBeforeCommit = 
waitForUserToPromptOnFailure(_failBeforeCommit);
 
-                // long start = System.nanoTime();
+                long start = System.nanoTime();
                 session.commit();
                 committed = true;
                 // log.debug("Time taken to commit :" + ((System.nanoTime() - 
start) / 1000000f) + " ms");


Reply via email to