Author: rgreig
Date: Tue May  8 09:31:27 2007
New Revision: 536243

URL: http://svn.apache.org/viewvc?view=rev&rev=536243
Log:
Some robustness added to tests by limiting buffered messages.

Modified:
    incubator/qpid/branches/M2/java/perftests/pom.xml
    
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?view=diff&rev=536243&r1=536242&r2=536243
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Tue May  8 09:31:27 2007
@@ -137,7 +137,7 @@
                         </property>
                         <property>
                             <name>-Xmx</name>
-                            <value>3072m</value>
+                            <value>256m</value>
                         </property>
                         <property>
                             <name>log4j.configuration</name>
@@ -161,7 +161,7 @@
                         <!-- Single pings. These can be scaled up by 
overriding the parameters when calling the test script. -->
                         <Ping-Once>-n Ping-Once -s [1] -r 1 -t testPingOk -o . 
org.apache.qpid.ping.PingTestPerf</Ping-Once>
                         <Ping-Once-Async>-n Ping-Once-Async -s [1] -r 1 -t 
testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf</Ping-Once-Async>
-                        <Ping-Latency>-n Ping-Latency -s [1000] -d 10S -t 
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf</Ping-Latency>
+                        <Ping-Latency>-n Ping-Latency -s [1000] -d 10S -t 
testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf 
rate=100</Ping-Latency>
 
                         <!-- More example Tests. These are examples to 
exercise all the features of the test harness. Can scale up with option 
overrides. -->
                         <Ping-Tx>-n Ping-Tx -s [100] -o . -t testAsyncPingOk 
org.apache.qpid.ping.PingAsyncTestPerf transacted=true</Ping-Tx>

Modified: 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=536243&r1=536242&r2=536243
==============================================================================
--- 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Tue May  8 09:31:27 2007
@@ -25,7 +25,9 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,8 +38,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
-import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.MessageProducer;
@@ -96,7 +98,10 @@
  *                                               3 - DUPS_OK_ACKNOWLEDGE
  *                                               257 - NO_ACKNOWLEDGE
  *                                               258 - PRE_ACKNOWLEDGE
- * <tr><td> pauseBatch       <td> 0        <td> In milliseconds. A pause to 
insert between transaction batches.
+ * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of 
messages send but not yet received.
+ *                                              Limits the volume of messages 
currently buffered on the client
+ *                                              or broker. Can help scale test 
clients by limiting amount of buffered
+ *                                              data to avoid out of memory 
errors.
  * </table>
  *
  * <p/>This implements the Runnable interface with a run method that 
implements an infinite ping loop. The ping loop
@@ -265,11 +270,8 @@
     /** Defines the default message acknowledgement mode. */
     public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
 
-    /** Holds the name of the property to get the pause between batches 
property from. */
-    public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
-
-    /** Defines the default time in milliseconds to wait between commit 
batches. */
-    public static final long PAUSE_AFTER_BATCH_DEFAULT = 0L;
+    public static final String MAX_PENDING_PROPNAME = "maxPending";
+    public static final int MAX_PENDING_DEFAULT = 0;
 
     /** Defines the default prefetch size to use when consuming messages. */
     public static final int PREFETCH_DEFAULT = 100;
@@ -310,8 +312,8 @@
         defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, 
TX_BATCH_SIZE_DEFAULT);
         defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, 
DESTINATION_COUNT_DEFAULT);
         defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
-        defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, 
PAUSE_AFTER_BATCH_DEFAULT);
         defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+        defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
     }
 
     protected String _brokerDetails;
@@ -364,8 +366,20 @@
     protected int _noOfDestinations;
     protected int _rate;
 
-    /** Holds the wait time to insert between every batch of messages 
committed. */
-    private long _pauseBatch;
+    /**
+     * Holds the size of the maximum amount of pending data that the client 
should buffer, sending is suspended
+     * if this limit is breached.
+     */
+    protected int _maxPendingSize;
+
+    /**
+     * Holds a cyclic barrier which is used to synchronize sender and receiver 
threads, where the sender has elected
+     * to wait until the number of unreceived message is reduced before 
continuing to send.
+     */
+    protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2);
+
+    /** Keeps a count of the number of message currently sent but not 
received. */
+    protected AtomicInteger _unreceived = new AtomicInteger(0);
 
     /** A source for providing sequential unique correlation ids. These will 
be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -375,7 +389,7 @@
      * ping producers on the same JVM.
      */
     private static Map<String, PerCorrelationId> perCorrelationIds =
-            Collections.synchronizedMap(new HashMap<String, 
PerCorrelationId>());
+        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
     /** A convenient formatter to use when time stamping output. */
     protected static final DateFormat timestampFormatter = new 
SimpleDateFormat("hh:mm:ss:SS");
@@ -472,7 +486,7 @@
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
         _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
-        _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
+        _maxPendingSize = 
properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
 
         // Check that one or more destinations were specified.
         if (_noOfDestinations < 1)
@@ -556,7 +570,7 @@
     {
         try
         {
-            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][]{}));
+            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}));
 
             // Create a ping producer overriding its defaults with all options 
passed on the command line.
             PingPongProducer pingProducer = new PingPongProducer(options);
@@ -598,8 +612,7 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            {
-            }
+            { }
         }
     }
 
@@ -650,11 +663,11 @@
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique,
-                                       boolean durable) throws JMSException, 
AMQException
+        boolean durable) throws JMSException, AMQException
     {
         log.debug("public void createPingDestinations(int noOfDestinations = " 
+ noOfDestinations + ", String selector = "
-                  + selector + ", String rootName = " + rootName + ", boolean 
unique = " + unique + ", boolean durable = "
-                  + durable + "): called");
+            + selector + ", String rootName = " + rootName + ", boolean unique 
= " + unique + ", boolean durable = "
+            + durable + "): called");
 
         _pingDestinations = new ArrayList<Destination>();
 
@@ -690,8 +703,8 @@
                 else
                 {
                     destination =
-                            AMQTopic.createDurableTopic(new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
-                                                        _clientID, 
(AMQConnection) _connection);
+                        AMQTopic.createDurableTopic(new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+                            _clientID, (AMQConnection) _connection);
                     log.debug("Created durable topic " + destination);
                 }
             }
@@ -700,11 +713,11 @@
             {
                 AMQShortString destinationName = new AMQShortString(rootName + 
id);
                 destination =
-                        new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
destinationName, destinationName, false, false,
-                                     _isDurable);
+                    new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
destinationName, destinationName, false, false,
+                        _isDurable);
                 ((AMQSession) _producerSession).createQueue(destinationName, 
false, _isDurable, false);
                 ((AMQSession) _producerSession).bindQueue(destinationName, 
destinationName, null,
-                                                          
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+                    ExchangeDefaults.DIRECT_EXCHANGE_NAME);
 
                 log.debug("Created queue " + destination);
             }
@@ -725,7 +738,7 @@
     public void createReplyConsumers(Collection<Destination> destinations, 
String selector) throws JMSException
     {
         log.debug("public void createReplyConsumers(Collection<Destination> 
destinations = " + destinations
-                  + ", String selector = " + selector + "): called");
+            + ", String selector = " + selector + "): called");
 
         log.debug("Creating " + destinations.size() + " reply consumers.");
 
@@ -733,8 +746,8 @@
         {
             // Create a consumer for the destination and set this pinger to 
listen to its messages.
             _consumer =
-                    _consumerSession.createConsumer(destination, 
PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
-                                                    selector);
+                _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, 
NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+                    selector);
             _consumer.setMessageListener(this);
 
             log.debug("Set this to listen to replies sent to destination: " + 
destination);
@@ -783,6 +796,31 @@
                     trueCount = trafficLight.getCount();
                     remainingCount = trueCount - 1;
 
+                    // Decrement the count of sent but not yet received 
messages.
+                    int unreceived = _unreceived.decrementAndGet();
+                    int unreceivedSize = (unreceived * ((_messageSize == 0) ? 
1 : _messageSize));
+
+                    // Release a waiting sender if there is one.
+                    if ((_maxPendingSize > 0) && (unreceivedSize < 
_maxPendingSize)
+                            && (_sendPauseBarrier.getNumberWaiting() == 1))
+                    {
+                        log.debug("unreceived size estimate under limit = " + 
unreceivedSize);
+
+                        // Wait on the send pause barrier for the limit to be 
re-established.
+                        try
+                        {
+                            _sendPauseBarrier.await();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                        catch (BrokenBarrierException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
                     // log.debug("remainingCount = " + remainingCount);
                     // log.debug("trueCount = " + trueCount);
 
@@ -849,10 +887,10 @@
      * @throws InterruptedException When interrupted by a timeout
      */
     public int pingAndWaitForReply(Message message, int numPings, long 
timeout, String messageCorrelationId)
-            throws JMSException, InterruptedException
+        throws JMSException, InterruptedException
     {
         log.debug("public int pingAndWaitForReply(Message message, int 
numPings = " + numPings + ", long timeout = "
-                  + timeout + ", String messageCorrelationId = " + 
messageCorrelationId + "): called");
+            + timeout + ", String messageCorrelationId = " + 
messageCorrelationId + "): called");
 
         // Generate a unique correlation id to put on the messages before 
sending them, if one was not specified.
         if (messageCorrelationId == null)
@@ -941,7 +979,7 @@
     public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId) throws JMSException
     {
         log.debug("public void pingNoWaitForReply(Message message, int 
numPings = " + numPings
-                  + ", String messageCorrelationId = " + messageCorrelationId 
+ "): called");
+            + ", String messageCorrelationId = " + messageCorrelationId + "): 
called");
 
         if (message == null)
         {
@@ -1014,6 +1052,29 @@
             waitForUser(KILL_BROKER_PROMPT);
         }
 
+        // Increase the count of sent but not yet received messages.
+        int unreceived = _unreceived.getAndIncrement();
+        int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize));
+
+        if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize))
+        {
+            log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
+
+            // Wait on the send pause barrier for the limit to be 
re-established.
+            try
+            {
+                _sendPauseBarrier.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (BrokenBarrierException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
         // Send the message either to its round robin destination, or its 
default destination.
         if (destination == null)
         {
@@ -1128,7 +1189,7 @@
         {
             Long value = ((AMQMessage) msg).getTimestampProperty(new 
AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
 
-            return value == null ? 0L : value;
+            return (value == null) ? 0L : value;
         }
         else
         {
@@ -1136,7 +1197,6 @@
         }
     }
 
-
     /**
      * Stops the ping loop by clearing the publish flag. The current loop will 
complete before it notices that this flag
      * has been cleared.
@@ -1177,12 +1237,12 @@
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
-        {
-            public void run()
-            {
-                stop();
-            }
-        });
+                {
+                    public void run()
+                    {
+                        stop();
+                    }
+                });
     }
 
     /**


Reply via email to