Author: ritchiem
Date: Mon Jun 25 07:16:30 2007
New Revision: 550509

URL: http://svn.apache.org/viewvc?view=rev&rev=550509
Log:
Update to provide a SustainedTestCase, this sends batches of messages to the 
broker. The rate of publication is regulated by the average consume rate 
advertised by all connected clients.

Modified:
    incubator/qpid/branches/M2/java/integrationtests/pom.xml
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java

Modified: incubator/qpid/branches/M2/java/integrationtests/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/pom.xml?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/integrationtests/pom.xml Mon Jun 25 
07:16:30 2007
@@ -46,6 +46,12 @@
             <artifactId>qpid-client</artifactId>
         </dependency>
 
+        <dependency>  
+            <groupId>org.slf4j</groupId> 
+            <artifactId>slf4j-log4j12</artifactId>  
+            <version>1.4.0</version>
+        </dependency>
+
         <dependency>
             <groupId>uk.co.thebadgerset</groupId>
             <artifactId>junit-toolkit</artifactId>

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
 Mon Jun 25 07:16:30 2007
@@ -79,10 +79,17 @@
 
     /**
      * Performs the test case actions.
-     *
+     * return from here when you have finished the test.. this will signal the 
controller that the test has ended. 
      * @throws JMSException Any JMSException resulting from reading the 
message are allowed to fall through.
      */
     public void start() throws JMSException;
+
+    /**
+     * Gives notice of termination of the test case actions.
+     *
+     * @throws JMSException Any JMSException resulting from allowed to fall 
through.
+     */
+    public void terminate() throws JMSException, InterruptedException;
 
     /**
      * Gets a report on the actions performed by the test case in its assigned 
role.

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
 Mon Jun 25 07:16:30 2007
@@ -20,23 +20,31 @@
  */
 package org.apache.qpid.interop.testclient;
 
-import java.io.IOException;
-import java.util.*;
-
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
 import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
-import org.apache.qpid.util.ClasspathScanner;
 import org.apache.qpid.util.CommandLineParser;
 import org.apache.qpid.util.PropertiesUtils;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
 /**
  * Implements a test client as described in the interop testing spec
  * 
(http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification).
 A test client is an agent that
@@ -201,7 +209,7 @@
         }
 
         // Open a connection to communicate with the coordinator on.
-        _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, 
brokerUrl, virtualHost);
+        _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, 
clientName, brokerUrl, virtualHost);
 
         session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -219,17 +227,21 @@
         _connection.start();
     }
 
+
+    public static Connection createConnection(String connectionPropsResource, 
String brokerUrl, String virtualHost)
+    {
+        return createConnection(connectionPropsResource, "clientID", 
brokerUrl, virtualHost);
+    }
+
     /**
      * Establishes a JMS connection using a properties file and qpids built in 
JNDI implementation. This is a simple
-     * convenience method for code that does anticipate handling connection 
failures. All exceptions that indicate
-     * that the connection has failed, are wrapped as rutime exceptions, 
preumably handled by a top level failure
-     * handler.
-     *
-     * @todo Make username/password configurable. Allow multiple urls for fail 
over. Once it feels right, move it
-     *       to a Utils library class.
+     * convenience method for code that does anticipate handling connection 
failures. All exceptions that indicate that
+     * the connection has failed, are wrapped as rutime exceptions, preumably 
handled by a top level failure handler.
      *
      * @param connectionPropsResource The name of the connection properties 
file.
-     * @param brokerUrl               The broker url to connect to, 
<tt>null</tt> to use the default from the properties.
+     * @param clientID
+     * @param brokerUrl               The broker url to connect to, 
<tt>null</tt> to use the default from the
+     *                                properties.
      * @param virtualHost             The virtual host to connectio to, 
<tt>null</tt> to use the default.
      *
      * @return A JMS conneciton.
@@ -237,7 +249,7 @@
      * @todo Make username/password configurable. Allow multiple urls for fail 
over. Once it feels right, move it to a
      * Utils library class.
      */
-    public static Connection createConnection(String connectionPropsResource, 
String brokerUrl, String virtualHost)
+    public static Connection createConnection(String connectionPropsResource, 
String clientID, String brokerUrl, String virtualHost)
     {
         log.debug("public static Connection createConnection(String 
connectionPropsResource = " + connectionPropsResource
                   + ", String brokerUrl = " + brokerUrl + ", String 
virtualHost = " + virtualHost + "): called");
@@ -251,7 +263,7 @@
             if (brokerUrl != null)
             {
                 String connectionString =
-                        "amqp://guest:guest/" + ((virtualHost != null) ? 
virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+                        "amqp://guest:guest@" + clientID + "/" + ((virtualHost 
!= null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
                 connectionProps.setProperty(CONNECTION_PROPERTY, 
connectionString);
             }
 
@@ -381,6 +393,14 @@
             {
                 log.info("Received termination instruction from coordinator.");
 
+//                try
+//                {
+//                    currentTestCase.terminate();
+//                }
+//                catch (InterruptedException e)
+//                {
+//                    //
+//                }
                 // Is a cleaner shutdown needed?
                 _connection.close();
                 System.exit(0);

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
 Mon Jun 25 07:16:30 2007
@@ -74,6 +74,11 @@
         // Do nothing.
     }
 
+    public void terminate() throws JMSException
+    {
+        //todo
+    }
+
     public Message getReport(Session session) throws JMSException
     {
         log.debug("public Message getReport(Session session): called");

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
 Mon Jun 25 07:16:30 2007
@@ -170,6 +170,11 @@
         }
     }
 
+    public void terminate() throws JMSException
+    {
+        //todo
+    }
+
     /**
      * Gets a report on the actions performed by the test case in its assigned 
role.
      *

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
 Mon Jun 25 07:16:30 2007
@@ -202,6 +202,11 @@
         }
     }
 
+    public void terminate() throws JMSException, InterruptedException
+    {
+        //todo
+    }
+
     /**
      * Gets a report on the actions performed by the test case in its assigned 
role.
      *

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
 Mon Jun 25 07:16:30 2007
@@ -38,6 +38,7 @@
 import javax.jms.TextMessage;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * Implements test case 3, basic pub/sub. Sends/received a specified number of 
messages to a specified route on the
@@ -52,7 +53,10 @@
 public class SustainedTestClient extends TestCase3BasicPubSub implements 
ExceptionListener
 {
     /** Used for debugging. */
-    private static final Logger log = 
Logger.getLogger(SustainedTestClient.class);
+    private static final Logger debugLog = 
Logger.getLogger(SustainedTestClient.class);
+
+    private static final Logger log = Logger.getLogger("SustainedTest");
+
 
     /** The role to be played by the test. */
     private Roles role;
@@ -83,9 +87,11 @@
     SustainedRateAdapter _rateAdapter;
 
     /**  */
-    int updateInterval;
+    int _batchSize;
+
 
-    private boolean _running = true;
+    private static final long TEN_MILLI_SEC = 10000000;
+    private static final long FIVE_MILLI_SEC = 5000000;
 
     /**
      * Should provide the name of the test case that this class implements. 
The exact names are defined in the interop
@@ -95,7 +101,7 @@
      */
     public String getName()
     {
-        log.debug("public String getName(): called");
+        debugLog.debug("public String getName(): called");
 
         return "Perf_SustainedPubSub";
     }
@@ -111,31 +117,34 @@
      */
     public void assignRole(Roles role, Message assignRoleMessage) throws 
JMSException
     {
-        log.debug("public void assignRole(Roles role = " + role + ", Message 
assignRoleMessage = " + assignRoleMessage
-                  + "): called");
+        debugLog.debug("public void assignRole(Roles role = " + role + ", 
Message assignRoleMessage = " + assignRoleMessage
+                       + "): called");
 
         // Take note of the role to be played.
         this.role = role;
 
         // Extract and retain the test parameters.
         numReceivers = 
assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
-        updateInterval = 
assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+        _batchSize = 
assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
         String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
         String sendUpdateKey = 
assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
         int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
 
-        log.debug("numReceivers = " + numReceivers);
-        log.debug("updateInterval = " + updateInterval);
-        log.debug("ackMode = " + ackMode);
-        log.debug("sendKey = " + sendKey);
-        log.debug("sendUpdateKey = " + sendUpdateKey);
-        log.debug("role = " + role);
+        if (debugLog.isDebugEnabled())
+        {
+            debugLog.debug("numReceivers = " + numReceivers);
+            debugLog.debug("_batchSize = " + _batchSize);
+            debugLog.debug("ackMode = " + ackMode);
+            debugLog.debug("sendKey = " + sendKey);
+            debugLog.debug("sendUpdateKey = " + sendUpdateKey);
+            debugLog.debug("role = " + role);
+        }
 
         switch (role)
         {
             // Check if the sender role is being assigned, and set up a single 
message producer if so.
             case SENDER:
-                log.info("*********** Creating SENDER");
+                log.info("Creating Sender");
                 // Create a new connection to pass the test messages on.
                 connection = new Connection[1];
                 session = new Session[1];
@@ -164,7 +173,7 @@
                 // Otherwise the receiver role is being assigned, so set this 
up to listen for messages on the required number
                 // of receiver connections.
             case RECEIVER:
-                log.info("*********** Creating RECEIVER");
+                log.info("Creating Receiver");
                 // Create the required number of receiver connections.
                 connection = new Connection[numReceivers];
                 session = new Session[numReceivers];
@@ -183,7 +192,7 @@
 
                     MessageConsumer consumer = 
session[i].createConsumer(sendDestination);
 
-                    consumer.setMessageListener(new 
SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], 
sendUpdateDestination));
+                    consumer.setMessageListener(new 
SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], 
sendUpdateDestination));
                 }
 
                 break;
@@ -196,29 +205,32 @@
         }
     }
 
+
     /** Performs the test case actions. */
     public void start() throws JMSException
     {
-        log.debug("public void start(): called");
+        debugLog.debug("public void start(): called");
 
         // Check that the sender role is being performed.
         switch (role)
         {
             // Check if the sender role is being assigned, and set up a single 
message producer if so.
             case SENDER:
-                Message testMessage = session[0].createTextMessage("test");
-
-//            for (int i = 0; i < numMessages; i++)
-                while (_running)
-                {
-                    producer.send(testMessage);
-
-                    _rateAdapter.sentMessage();
-                }
+                _rateAdapter.run();
                 break;
             case RECEIVER:
 
         }
+
+        //return from here when you have finished the test.. this will signal 
the controller and
+    }
+
+    public void terminate() throws JMSException, InterruptedException
+    {
+        if (_rateAdapter != null)
+        {
+            _rateAdapter.stop();
+        }
     }
 
     /**
@@ -232,7 +244,7 @@
      */
     public Message getReport(Session session) throws JMSException
     {
-        log.debug("public Message getReport(Session session): called");
+        debugLog.debug("public Message getReport(Session session): called");
 
         // Close the test connections.
         for (int i = 0; i < connection.length; i++)
@@ -252,89 +264,100 @@
 
         if (linked != null)
         {
-            if (linked instanceof AMQNoRouteException)
+            if (debugLog.isDebugEnabled())
             {
-                log.warn("No route .");
+                debugLog.debug("Linked Exception:" + linked);
             }
-            else if (linked instanceof AMQNoConsumersException)
-            {
-                log.warn("No clients currently available for message:" + 
((AMQNoConsumersException) linked).getUndeliveredMessage());
-            }
-            else
+            if ((linked instanceof AMQNoRouteException)
+                || (linked instanceof AMQNoConsumersException))
             {
+                if (debugLog.isDebugEnabled())
+                {
+                    if (linked instanceof AMQNoConsumersException)
+                    {
+                        debugLog.warn("No clients currently available for 
message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+                    }
+                    else
+                    {
+                        debugLog.warn("No route for message");
+                    }
+                }
 
-                log.warn("LinkedException:" + linked);
+                // Tell the rate adapter that there are no clients ready yet
+                _rateAdapter.NO_CLIENTS = true;
             }
-
-            _rateAdapter.NO_CLIENTS = true;
         }
         else
         {
-            log.warn("Exception:" + linked);
+            debugLog.warn("Exception:" + linked);
         }
     }
 
+    /**
+     * Inner class that listens for messages and sends a report for the time 
taken between receiving the 'start' and
+     * 'end' messages.
+     */
     class SustainedListener implements MessageListener
     {
-        private int _received = 0;
-        private int _updateInterval = 0;
-        private Long _time;
+        /** Number of messages received */
+        private long _received = 0;
+        /** The number of messages in the batch */
+        private int _batchSize = 0;
+        /** Record of the when the 'start' messagse was sen */
+        private Long _startTime;
+        /** Message producer to use to send reports */
         MessageProducer _updater;
+        /** Session to create the report message on */
         Session _session;
+        /** Record of the client ID used for this SustainedListnener */
         String _client;
 
 
-        public SustainedListener(String clientname, int updateInterval, 
Session session, Destination sendDestination) throws JMSException
+        /**
+         * Main Constructor
+         *
+         * @param clientname      The _client id used to identify this 
connection.
+         * @param batchSize       The number of messages that are to be sent 
per batch. Note: This is not used to
+         *                        control the interval between sending reports.
+         * @param session         The session used for communication.
+         * @param sendDestination The destination that update reports should 
be sent to.
+         *
+         * @throws JMSException My occur if creatingthe Producer fails
+         */
+        public SustainedListener(String clientname, int batchSize, Session 
session, Destination sendDestination) throws JMSException
         {
-            _updateInterval = updateInterval;
+            _batchSize = batchSize;
             _client = clientname;
             _session = session;
             _updater = session.createProducer(sendDestination);
         }
 
-        public void setReportInterval(int reportInterval)
-        {
-            _updateInterval = reportInterval;
-            _received = 0;
-        }
-
         public void onMessage(Message message)
         {
-            if (log.isDebugEnabled())
+            if (debugLog.isTraceEnabled())
             {
-                log.debug("Message " + _received + "received in listener");
+                debugLog.trace("Message " + _received + "received in 
listener");
             }
 
+
             if (message instanceof TextMessage)
             {
-
                 try
                 {
-                    if (((TextMessage) message).getText().equals("test"))
+                    _received++;
+                    if (((TextMessage) message).getText().equals("start"))
                     {
-                        if (_received == 0)
-                        {
-                            _time = System.nanoTime();
-                            sendStatus(0, _received);
-                        }
-
-                        _received++;
-
-                        if (_received % _updateInterval == 0)
+                        debugLog.info("Starting Batch");
+                        _startTime = System.nanoTime();
+                    }
+                    else if (((TextMessage) message).getText().equals("end"))
+                    {
+                        if (_startTime != null)
                         {
-                            Long currentTime = System.nanoTime();
-
-                            try
-                            {
-                                sendStatus(currentTime - _time, _received);
-                                _time = currentTime;
-                            }
-                            catch (JMSException e)
-                            {
-                                log.error("Unable to send update.");
-                            }
+                            long currentTime = System.nanoTime();
+                            sendStatus(currentTime - _startTime, _received);
+                            debugLog.info("End Batch");
                         }
-
                     }
                 }
                 catch (JMSException e)
@@ -342,37 +365,68 @@
                     //ignore error
                 }
             }
+
         }
 
-        private void sendStatus(long time, int received) throws JMSException
+        /**
+         * sendStatus creates and sends the report back to the publisher
+         *
+         * @param time     taken for the the last batch
+         * @param received Total number of messages received.
+         *
+         * @throws JMSException if an error occurs during the send
+         */
+        private void sendStatus(long time, long received) throws JMSException
         {
             Message updateMessage = _session.createTextMessage("update");
-            updateMessage.setStringProperty("CLIENT_ID", _client);
+            updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
             updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
             updateMessage.setLongProperty("RECEIVED", received);
             updateMessage.setLongProperty("DURATION", time);
 
-            log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + 
received + " DURATION:" + time);
+            if (debugLog.isInfoEnabled())
+            {
+                debugLog.info("**** SENDING [" + received / _batchSize + 
"]**** "
+                              + "CLIENT_ID:" + _client + " RECEIVED:" + 
received + " DURATION:" + time);
+            }
+
+            // Output on the main log.info the details of this batch
+            if (received / _batchSize % 10 == 0)
+            {
+                log.info("Sending Report [" + received / _batchSize + "] "
+                         + "CLIENT_ID:" + _client + " RECEIVED:" + received + 
" DURATION:" + time);
+            }
 
             _updater.send(updateMessage);
         }
-
     }
 
-    class SustainedRateAdapter implements MessageListener
+
+    /**
+     * This class is used here to adjust the _delay value which in turn is 
used to control the number of messages/second
+     * that are sent through the test system.
+     *
+     * By keeping a record of the messages recevied and the average time taken 
to process the batch size can be
+     * calculated and so the delay can be adjusted to maintain that rate.
+     *
+     * Given that delays of < 10ms can be rounded up the delay is only used 
between messages if the _delay > 10ms * no
+     * messages in the batch. Otherwise the delay is used at the end of the 
batch.
+     */
+    class SustainedRateAdapter implements MessageListener, Runnable
     {
         private SustainedTestClient _client;
-        private long _variance = 250; //no. messages to allow drifting
+        private long _messageVariance = 500; //no. messages to allow drifting
+        private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between 
send and report delay (10ms)
         private volatile long _delay;   //in nanos
         private long _sent;
         private Map<String, Long> _slowClients = new HashMap<String, Long>();
-        private static final long PAUSE_SLEEP = 10; // 10 ms
-        private static final long NO_CLIENT_SLEEP = 1000; // 1s 
-        private static final long MAX_MESSAGE_DRIFT = 1000; // no messages 
drifted from producer
+        private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
+        private static final long NO_CLIENT_SLEEP = 1000; // 1s
         private volatile boolean NO_CLIENTS = true;
         private int _delayShifting;
-        private static final int REPORTS_WITHOUT_CHANGE = 10;
-        private static final double MAXIMUM_DELAY_SHIFT = .02; //2% 
+        private static final int REPORTS_WITHOUT_CHANGE = 5;
+        private boolean _warmedup = false;
+        private static final long EXPECTED_TIME_PER_BATCH = 100000L;
 
         SustainedRateAdapter(SustainedTestClient client)
         {
@@ -381,9 +435,9 @@
 
         public void onMessage(Message message)
         {
-            if (log.isDebugEnabled())
+            if (debugLog.isDebugEnabled())
             {
-                log.debug("SustainedRateAdapter onMessage(Message message = " 
+ message + "): called");
+                debugLog.debug("SustainedRateAdapter onMessage(Message message 
= " + message + "): called");
             }
 
             try
@@ -395,15 +449,25 @@
                 {
                     NO_CLIENTS = false;
                     long duration = message.getLongProperty("DURATION");
-                    long received = message.getLongProperty("RECEIVED");
+                    long totalReceived = message.getLongProperty("RECEIVED");
                     String client = message.getStringProperty("CLIENT_ID");
 
-                    log.info("**** SENDING **** CLIENT_ID:" + client + " 
RECEIVED:" + received + " DURATION:" + duration);
+                    if (debugLog.isInfoEnabled())
+                    {
+                        debugLog.info("Update Report: CLIENT_ID:" + client + " 
RECEIVED:" + totalReceived + " DURATION:" + duration);
+                    }
+
+                    recordSlow(client, totalReceived);
 
+                    adjustDelay(client, totalReceived, duration);
 
-                    recordSlow(client, received);
 
-                    adjustDelay(client, received, duration);
+                    if (!_warmedup && _totalReceived / _batchSize / 
delays.size() == _warmUpBatches / 2)
+                    {
+                        _warmedup = true;
+                        _warmup.countDown();
+
+                    }
                 }
             }
             catch (JMSException e)
@@ -412,72 +476,220 @@
             }
         }
 
-        class Pair<X, Y>
+        CountDownLatch _warmup = new CountDownLatch(1);
+
+        int _warmUpBatches = 20;
+
+        int _numBatches = 10000;
+
+        //        long[] _timings = new long[_numBatches];
+        private boolean _running = true;
+
+
+        public void run()
         {
-            X item1;
-            Y item2;
+            log.info("Warming up");
 
-            Pair(X i1, Y i2)
+            doBatch(_warmUpBatches);
+
+            try
             {
-                item1 = i1;
-                item2 = i2;
+                //wait for warmup to complete.
+                _warmup.await();
+
+                //set delay to the average length of the batches
+                _delay = _totalDuration / _warmUpBatches / delays.size();
+
+                log.info("Warmup complete delay set : " + _delay
+                         + " based on _totalDuration: " + _totalDuration
+                         + " over no. batches: " + _warmUpBatches
+                         + " with client count: " + delays.size());
+
+                _totalDuration = 0L;
+                _totalReceived = 0L;
+                _sent = 0L;
             }
+            catch (InterruptedException e)
+            {
+                //
+            }
+
 
-            X getItem1()
+            doBatch(_numBatches);
+
+        }
+
+        private void doBatch(int batchSize)  // long[] timings,
+        {
+            TextMessage testMessage = null;
+            try
             {
-                return item1;
+                testMessage = _client.session[0].createTextMessage("start");
+
+
+                for (int batch = 0; batch < batchSize; batch++)
+//                while (_running)
+                {
+                    long start = System.nanoTime();
+
+                    testMessage.setText("start");
+                    _client.producer.send(testMessage);
+                    _rateAdapter.sentMessage();
+
+                    testMessage.setText("test");
+                    //start at 2 so start and end count as part of batch
+                    for (int m = 2; m < _batchSize; m++)
+                    {
+                        _client.producer.send(testMessage);
+                        _rateAdapter.sentMessage();
+                    }
+
+                    testMessage.setText("end");
+                    _client.producer.send(testMessage);
+                    _rateAdapter.sentMessage();
+
+                    long end = System.nanoTime();
+
+                    long sendtime = end - start;
+
+                    debugLog.info("Sent batch[" + batch + "](" + _batchSize + 
") in " + sendtime);//timings[batch]);
+
+                    if (batch % 10 == 0)
+                    {
+                        log.info("Sent Batch[" + batch + "](" + _batchSize + 
")" + status());
+                    }
+
+                    _rateAdapter.sleepBatch();
+
+                }
+            }
+            catch (JMSException e)
+            {
+                log.error("Runner ended");
+            }
+        }
+
+        private String status()
+        {
+            return " TotalDuration: " + _totalDuration + " for " + 
delays.size() + " consumers"
+                   + " Delay is " + _delay + " resulting in "
+                   + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / 
_batchSize) + "/msg" : _delay + "/batch");
+        }
+
+        private void sleepBatch()
+        {
+            if (checkForSlowClients())
+            {//if there werwe slow clients we have already slept so don't 
sleep anymore again.
+                return;
             }
 
-            Y getItem2()
+            //Slow down if gap between send and received is too large
+            if (_sent - _totalReceived / delays.size() > _messageVariance)
             {
-                return item2;
+                //pause between batches.
+                debugLog.info("Sleeping to keep sent in check with received");
+                log.debug("Increaseing _delay as sending more than receiving");
+                _delay += TEN_MILLI_SEC;
+            }
+
+            //per batch sleep.. if sleep is to small to spread over the batch.
+            if (_delay <= TEN_MILLI_SEC * _batchSize)
+            {
+                sleepLong(_delay);
+            }
+            else
+            {
+                debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
             }
         }
 
-        Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, 
Long>>();
-        Long totalReceived = 0L;
-        Long totalDuration = 0L;
+        public void stop()
+        {
+            _running = false;
+        }
+
+        Map<String, Long> delays = new HashMap<String, Long>();
+        Long _totalReceived = 0L;
+        Long _totalDuration = 0L;
+        int _skipUpdate = 0;
 
-        private void adjustDelay(String client, long received, long duration)
+        /**
+         * Adjust the delay for sending messages based on this update from the 
client
+         *
+         * @param client        The client that send this update
+         * @param totalReceived The number of messages that this client has 
received.
+         * @param duration      The time taken for the last batch of messagse
+         */
+        private void adjustDelay(String client, long totalReceived, long 
duration)
         {
-            Pair<Long, Long> current = delays.get(client);
+            //Retrieve the current total time taken for this client.
+            Long currentTime = delays.get(client);
 
-            if (current == null)
+            // Add the new duration time to this client
+            if (currentTime == null)
             {
-                delays.put(client, new Pair<Long, Long>(received, duration));
+                currentTime = duration;
             }
             else
             {
-                //reduce totals
-                totalReceived -= current.getItem1();
-                totalDuration -= current.getItem2();
+                currentTime += duration;
             }
 
-            totalReceived += received;
-            totalDuration += duration;
+            delays.put(client, currentTime);
+
+
+            _totalReceived += _batchSize;
+            _totalDuration += duration;
 
-            long averageDuration = totalDuration / delays.size();
+            // Calculate the number of messages in the batch.
+            long batchCount = (_totalReceived / _batchSize);
 
-            long diff = Math.abs(_delay - averageDuration);
+            //calculate average duration accross clients per batch
+            long averageDuration = _totalDuration / delays.size() / batchCount;
+
+            //calculate the difference between current send delay and average 
report delay
+            long diff = (duration) - averageDuration;
+
+            if (debugLog.isInfoEnabled())
+            {
+                debugLog.info("TotalDuration:" + _totalDuration + " for " + 
delays.size() + " consumers"
+                              + " on batch: " + batchCount
+                              + " Batch Duration: " + duration
+                              + " Average: " + averageDuration
+                              + " so diff: " + diff + " for : " + client
+                              + " Delay is " + _delay + " resulting in "
+                              + ((_delay > TEN_MILLI_SEC * _batchSize)
+                                 ? (_delay / _batchSize) + "/msg" : _delay + 
"/batch"));
+            }
 
             //if the averageDuration differs from the current by more than the 
specified variane then adjust delay.
-            if (diff > _variance)
+            if (Math.abs(diff) > _timeVariance)
             {
-                if (averageDuration > _delay)
+
+                // if the the _delay is larger than the required duration to 
send report
+                // speed up
+                if (diff > TEN_MILLI_SEC)
                 {
-                    // we can go faster
-                    _delay -= diff;
+                    _delay -= TEN_MILLI_SEC;
+
                     if (_delay < 0)
                     {
                         _delay = 0;
+                        debugLog.info("Reset _delay to 0");
+                        delayStable();
+                    }
+                    else
+                    {
+                        delayChanged();
                     }
+
                 }
-                else
+                else if (diff < 0) // diff < 0 diff cannot be 0 as it is > 
_timeVariance
                 {
-                    // we need to slow down
-                    _delay += diff;
+                    // the report took longer
+                    _delay += TEN_MILLI_SEC;
+                    delayChanged();
                 }
-                delayChanged();
             }
             else
             {
@@ -486,11 +698,16 @@
 
         }
 
+        /** Reset the number of iterations before we say the delay has 
stabilised. */
         private void delayChanged()
         {
             _delayShifting = REPORTS_WITHOUT_CHANGE;
         }
 
+        /**
+         * Record the fact that delay has stabilised If delay has stablised 
for REPORTS_WITHOUT_CHANGE then it will
+         * output Delay stabilised
+         */
         private void delayStable()
         {
             _delayShifting--;
@@ -498,14 +715,20 @@
             if (_delayShifting < 0)
             {
                 _delayShifting = 0;
-                log.info("Delay stabilised:" + _delay);
+                log.debug("Delay stabilised:" + _delay);
             }
         }
 
-        // Record Slow clients
+        /**
+         * Checks that the client has received enough messages. If the client 
has fallen behind then they are put in the
+         * _slowClients lists which will increase the delay.
+         *
+         * @param client   The client identifier to check
+         * @param received the number of messages received by that client
+         */
         private void recordSlow(String client, long received)
         {
-            if (received < (_sent - _variance))
+            if (received < (_sent - _messageVariance))
             {
                 _slowClients.put(client, received);
             }
@@ -515,20 +738,49 @@
             }
         }
 
+        /** Incrment the number of sent messages and then sleep, if required. 
*/
         public void sentMessage()
         {
-            if (_sent % updateInterval == 0)
+
+            _sent++;
+
+            if (_delay > TEN_MILLI_SEC * _batchSize)
             {
+                long batchDelay = _delay / _batchSize;
+                // less than 10ms sleep doesn't always work.
+                // _delay is in nano seconds
+//                if (batchDelay < (TEN_MILLI_SEC))
+//                {
+//                    sleep(0, (int) batchDelay);
+//                }
+//                else
+                {
+//                    if (batchDelay < 30000000000L)
+                    {
+                        sleepLong(batchDelay);
+                    }
+                }
+            }
+        }
+
 
+        /**
+         * Check at the end of each batch and pause sending messages to allow 
slow clients to catch up.
+         *
+         * @return true if there were slow clients that caught up.
+         */
+        private boolean checkForSlowClients()
+        {
+            if (_sent % _batchSize == 0)
+            {
                 // Cause test to pause when we have slow
                 if (!_slowClients.isEmpty() || NO_CLIENTS)
                 {
-                    log.info("Pausing for slow clients");
-
-                    //_delay <<= 1;
+                    debugLog.info("Pausing for slow clients:" + 
_slowClients.entrySet().toArray());
 
                     while (!_slowClients.isEmpty())
                     {
+                        debugLog.info(_slowClients.size() + " slow clients.");
                         sleep(PAUSE_SLEEP);
                     }
 
@@ -537,45 +789,67 @@
                         sleep(NO_CLIENT_SLEEP);
                     }
 
-                    log.debug("Continuing");
-                    return;
+                    debugLog.debug("Continuing");
+                    return true;
                 }
                 else
                 {
-                    log.info("Delay:" + _delay);
+                    debugLog.info("Delay:" + _delay);
                 }
+
             }
 
-            _sent++;
+            return false;
+        }
 
-            if (_delay > 0)
-            {
-                // less than 10ms sleep doesn't work.
-                // _delay is in nano seconds
-                if (_delay < 1000000)
-                {
-                    sleep(0, (int) _delay);
-                }
-                else
-                {
-                    if (_delay < 30000000000L)
-                    {
-                        sleep(_delay / 1000000, (int) (_delay % 1000000));
-                    }
-                }
-            }
+        /**
+         * Sleep normally takes micro-seconds this allows the use of a 
nano-second value.
+         *
+         * @param delay nanoseconds to sleep for.
+         */
+        private void sleepLong(long delay)
+        {
+            sleep(delay / 1000000, (int) (delay % 1000000));
         }
 
+        /**
+         * Sleep for the specified micro-seconds.
+         * @param sleep microseconds to sleep for.
+         */
         private void sleep(long sleep)
         {
             sleep(sleep, 0);
         }
 
+        /**
+         * Perform the sleep , swallowing any InteruptException.
+         *
+         * NOTE: If a sleep request is > 10s then reset only sleep for 5s
+         *  
+         * @param milli to sleep for
+         * @param nano sub miliseconds to sleep for
+         */
         private void sleep(long milli, int nano)
         {
             try
             {
-                log.debug("Sleep:" + milli + ":" + nano);
+                debugLog.debug("Sleep:" + milli + ":" + nano);
+                if (milli > 10000)
+                {
+
+                    if (_delay == milli)
+                    {
+                        _totalDuration = _totalReceived / _batchSize * 
EXPECTED_TIME_PER_BATCH;
+                        debugLog.error("Sleeping for more than 10 seconds 
adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration);
+                    }
+                    else
+                    {
+                        debugLog.error("Sleeping for more than 10 seconds 
adjusted to 5s!:" + milli / 1000 + "s");
+                    }
+
+                    milli = 5000;
+                }
+
                 Thread.sleep(milli, nano);
             }
             catch (InterruptedException e)
@@ -583,6 +857,12 @@
                 //
             }
         }
+
+        public void setClient(SustainedTestClient client)
+        {
+            _client = client;
+        }
     }
 
 }
+

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=diff&rev=550509&r1=550508&r2=550509
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
 Mon Jun 25 07:16:30 2007
@@ -78,11 +78,12 @@
         Map<String, Object> testConfig = new HashMap<String, Object>();
         testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
         testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
-        //testConfig.put("SUSTAINED_MSG_RATE", 10);
-        testConfig.put("SUSTAINED_NUM_RECEIVERS", 2);
-        testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25);
+        testConfig.put("SUSTAINED_NUM_RECEIVERS", 
Integer.getInteger("numReceives", 2));
+        testConfig.put("SUSTAINED_UPDATE_INTERVAL", 
Integer.getInteger("batchSize", 1000));
         testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
-        testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE);
+        testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", 
AMQSession.AUTO_ACKNOWLEDGE));
+
+        log.info("Created Config: " + testConfig.entrySet().toArray());
 
         sequenceTest(testConfig);
     }


Reply via email to