Author: ritchiem
Date: Wed Jun 27 03:51:34 2007
New Revision: 551117

URL: http://svn.apache.org/viewvc?view=rev&rev=551117
Log:
Update to the sustained test to ensure late joining occurs correctly and 
improved stabilisation. Additional system properties now documented on wiki.
http://cwiki.apache.org/qpid/sustained-tests.html

Modified:
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
    
incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=551117&r1=551116&r2=551117
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
 Wed Jun 27 03:51:34 2007
@@ -252,7 +252,8 @@
     public static Connection createConnection(String connectionPropsResource, 
String clientID, String brokerUrl, String virtualHost)
     {
         log.debug("public static Connection createConnection(String 
connectionPropsResource = " + connectionPropsResource
-                  + ", String brokerUrl = " + brokerUrl + ", String 
virtualHost = " + virtualHost + "): called");
+                  + ", String brokerUrl = " + brokerUrl + ", String clientID = 
" + clientID
+                  + ", String virtualHost = " + virtualHost + " ): called");
 
         try
         {

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=diff&rev=551117&r1=551116&r2=551117
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
 Wed Jun 27 03:51:34 2007
@@ -37,6 +37,7 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -91,7 +92,9 @@
 
 
     private static final long TEN_MILLI_SEC = 10000000;
-    private static final long FIVE_MILLI_SEC = 5000000;
+    private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+    private static final int LOG_UPATE_INTERVAL = 10;
+    private static final boolean SLEEP_PER_MESSAGE = 
Boolean.getBoolean("sleepPerMessage");
 
     /**
      * Should provide the name of the test case that this class implements. 
The exact names are defined in the interop
@@ -129,6 +132,7 @@
         String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
         String sendUpdateKey = 
assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
         int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+        String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
 
         if (debugLog.isDebugEnabled())
         {
@@ -150,7 +154,9 @@
                 session = new Session[1];
 
                 connection[0] =
-                        
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
 org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                        
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+                                                                               
        clientName,
+                                                                               
        org.apache.qpid.interop.testclient.TestClient.brokerUrl,
                                                                                
        org.apache.qpid.interop.testclient.TestClient.virtualHost);
                 session[0] = connection[0].createSession(false, ackMode);
 
@@ -182,6 +188,7 @@
                 {
                     connection[i] =
                             
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+                                                                               
            clientName,
                                                                                
            org.apache.qpid.interop.testclient.TestClient.brokerUrl,
                                                                                
            org.apache.qpid.interop.testclient.TestClient.virtualHost);
                     session[i] = connection[i].createSession(false, ackMode);
@@ -192,7 +199,7 @@
 
                     MessageConsumer consumer = 
session[i].createConsumer(sendDestination);
 
-                    consumer.setMessageListener(new 
SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], 
sendUpdateDestination));
+                    consumer.setMessageListener(new 
SustainedListener(clientName + "-" + i, _batchSize, session[i], 
sendUpdateDestination));
                 }
 
                 break;
@@ -347,7 +354,7 @@
                     _received++;
                     if (((TextMessage) message).getText().equals("start"))
                     {
-                        debugLog.info("Starting Batch");
+                        debugLog.debug("Starting Batch");
                         _startTime = System.nanoTime();
                     }
                     else if (((TextMessage) message).getText().equals("end"))
@@ -355,8 +362,8 @@
                         if (_startTime != null)
                         {
                             long currentTime = System.nanoTime();
-                            sendStatus(currentTime - _startTime, _received);
-                            debugLog.info("End Batch");
+                            sendStatus(currentTime - _startTime, _received, 
message.getIntProperty("BATCH"));
+                            debugLog.debug("End Batch");
                         }
                     }
                 }
@@ -373,28 +380,31 @@
          *
          * @param time     taken for the the last batch
          * @param received Total number of messages received.
-         *
+         * @param batchNumber the batch number
          * @throws JMSException if an error occurs during the send
          */
-        private void sendStatus(long time, long received) throws JMSException
+        private void sendStatus(long time, long received, int batchNumber) 
throws JMSException
         {
             Message updateMessage = _session.createTextMessage("update");
             updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
             updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
             updateMessage.setLongProperty("RECEIVED", received);
+            updateMessage.setIntProperty("BATCH", batchNumber);
             updateMessage.setLongProperty("DURATION", time);
 
             if (debugLog.isInfoEnabled())
             {
-                debugLog.info("**** SENDING [" + received / _batchSize + 
"]**** "
-                              + "CLIENT_ID:" + _client + " RECEIVED:" + 
received + " DURATION:" + time);
+                debugLog.info("**** SENDING [" + batchNumber + "]**** "
+                              + "CLIENT_ID:" + _client + " RECEIVED:" + 
received
+                              + " BATCH:" + batchNumber + " DURATION:" + time);
             }
 
             // Output on the main log.info the details of this batch
-            if (received / _batchSize % 10 == 0)
+            if (batchNumber % 10 == 0)
             {
-                log.info("Sending Report [" + received / _batchSize + "] "
-                         + "CLIENT_ID:" + _client + " RECEIVED:" + received + 
" DURATION:" + time);
+                log.info("Sending Report [" + batchNumber + "] "
+                         + "CLIENT_ID:" + _client + " RECEIVED:" + received
+                         + " BATCH:" + batchNumber + " DURATION:" + time);
             }
 
             _updater.send(updateMessage);
@@ -415,7 +425,7 @@
     class SustainedRateAdapter implements MessageListener, Runnable
     {
         private SustainedTestClient _client;
-        private long _messageVariance = 500; //no. messages to allow drifting
+        private long _batchVariance = 3; //no. batches to allow drifting
         private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between 
send and report delay (10ms)
         private volatile long _delay;   //in nanos
         private long _sent;
@@ -451,18 +461,23 @@
                     long duration = message.getLongProperty("DURATION");
                     long totalReceived = message.getLongProperty("RECEIVED");
                     String client = message.getStringProperty("CLIENT_ID");
+                    int batchNumber = message.getIntProperty("BATCH");
 
-                    if (debugLog.isInfoEnabled())
+                    if (debugLog.isInfoEnabled() && batchNumber % 
DEBUG_LOG_UPATE_INTERVAL == 0)
                     {
-                        debugLog.info("Update Report: CLIENT_ID:" + client + " 
RECEIVED:" + totalReceived + " DURATION:" + duration);
+                        debugLog.info("Update Report: CLIENT_ID:" + client + " 
RECEIVED:" + totalReceived
+                                      + " Recevied BATCH:" + batchNumber + " 
DURATION:" + duration);
                     }
 
-                    recordSlow(client, totalReceived);
-
-                    adjustDelay(client, totalReceived, duration);
+                    recordSlow(client, totalReceived, batchNumber);
 
+                    adjustDelay(client, batchNumber, duration);
 
-                    if (!_warmedup && _totalReceived / _batchSize / 
delays.size() == _warmUpBatches / 2)
+                    // Warm up completes when:
+                    // we haven't warmed up
+                    // and the number of batches sent to each client is at 
least half of the required warmup batches
+                    if (!_warmedup
+                        && (batchNumber >= _warmUpBatches))
                     {
                         _warmedup = true;
                         _warmup.countDown();
@@ -478,7 +493,7 @@
 
         CountDownLatch _warmup = new CountDownLatch(1);
 
-        int _warmUpBatches = 20;
+        int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
 
         int _numBatches = 10000;
 
@@ -527,12 +542,14 @@
                 testMessage = _client.session[0].createTextMessage("start");
 
 
-                for (int batch = 0; batch < batchSize; batch++)
+                for (int batch = 0; batch <= batchSize; batch++)
 //                while (_running)
                 {
                     long start = System.nanoTime();
 
                     testMessage.setText("start");
+                    testMessage.setIntProperty("BATCH", batch);
+
                     _client.producer.send(testMessage);
                     _rateAdapter.sentMessage();
 
@@ -552,9 +569,12 @@
 
                     long sendtime = end - start;
 
-                    debugLog.info("Sent batch[" + batch + "](" + _batchSize + 
") in " + sendtime);//timings[batch]);
+                    if (debugLog.isDebugEnabled())
+                    {
+                        debugLog.info("Sent batch[" + batch + "](" + 
_batchSize + ") in " + sendtime);//timings[batch]);
+                    }
 
-                    if (batch % 10 == 0)
+                    if (batch % LOG_UPATE_INTERVAL == 0)
                     {
                         log.info("Sent Batch[" + batch + "](" + _batchSize + 
")" + status());
                     }
@@ -583,23 +603,17 @@
                 return;
             }
 
-            //Slow down if gap between send and received is too large
-            if (_sent - _totalReceived / delays.size() > _messageVariance)
-            {
-                //pause between batches.
-                debugLog.info("Sleeping to keep sent in check with received");
-                log.debug("Increaseing _delay as sending more than receiving");
-                _delay += TEN_MILLI_SEC;
-            }
-
-            //per batch sleep.. if sleep is to small to spread over the batch.
-            if (_delay <= TEN_MILLI_SEC * _batchSize)
-            {
-                sleepLong(_delay);
-            }
-            else
+            if (!SLEEP_PER_MESSAGE)
             {
-                debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+                //per batch sleep.. if sleep is to small to spread over the 
batch.
+                if (_delay <= TEN_MILLI_SEC * _batchSize)
+                {
+                    sleepLong(_delay);
+                }
+                else
+                {
+                    debugLog.info("Not sleeping _delay > ten*batch is:" + 
_delay);
+                }
             }
         }
 
@@ -617,10 +631,10 @@
          * Adjust the delay for sending messages based on this update from the 
client
          *
          * @param client        The client that send this update
-         * @param totalReceived The number of messages that this client has 
received.
          * @param duration      The time taken for the last batch of messagse
+         * @param batchNumber   The reported batchnumber from the client 
          */
-        private void adjustDelay(String client, long totalReceived, long 
duration)
+        private void adjustDelay(String client, int batchNumber, long duration)
         {
             //Retrieve the current total time taken for this client.
             Long currentTime = delays.get(client);
@@ -637,23 +651,28 @@
 
             delays.put(client, currentTime);
 
+            long batchesSent = _sent / _batchSize;
+
+            // ensure we don't divide by zero
+            if (batchesSent == 0)
+            {
+                batchesSent = 1L;
+            }
 
             _totalReceived += _batchSize;
             _totalDuration += duration;
 
-            // Calculate the number of messages in the batch.
-            long batchCount = (_totalReceived / _batchSize);
-
             //calculate average duration accross clients per batch
-            long averageDuration = _totalDuration / delays.size() / batchCount;
+            long averageDuration = _totalDuration / delays.size() / 
batchesSent;
 
             //calculate the difference between current send delay and average 
report delay
             long diff = (duration) - averageDuration;
 
-            if (debugLog.isInfoEnabled())
+            if (debugLog.isInfoEnabled() && batchNumber % 
DEBUG_LOG_UPATE_INTERVAL == 0)
             {
-                debugLog.info("TotalDuration:" + _totalDuration + " for " + 
delays.size() + " consumers"
-                              + " on batch: " + batchCount
+                debugLog.info("TotalDuration:" + _totalDuration + " for " + 
delays.size() + " consumers."
+                              + " on batch: " + batchesSent
+                              + " received batch: " + batchNumber
                               + " Batch Duration: " + duration
                               + " Average: " + averageDuration
                               + " so diff: " + diff + " for : " + client
@@ -696,6 +715,16 @@
                 delayStable();
             }
 
+            // If we have a consumer that is behind with the batches.
+            if (batchesSent - batchNumber > _batchVariance)
+            {
+                debugLog.debug("Increasing _delay as sending more than 
receiving");
+
+                _delay += 2 * TEN_MILLI_SEC;
+                delayChanged();
+            }
+
+
         }
 
         /** Reset the number of iterations before we say the delay has 
stabilised. */
@@ -725,10 +754,11 @@
          *
          * @param client   The client identifier to check
          * @param received the number of messages received by that client
+         * @param batchNumber
          */
-        private void recordSlow(String client, long received)
+        private void recordSlow(String client, long received, int batchNumber)
         {
-            if (received < (_sent - _messageVariance))
+            if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
             {
                 _slowClients.put(client, received);
             }
@@ -761,6 +791,13 @@
                     }
                 }
             }
+            else
+            {
+                if (SLEEP_PER_MESSAGE && (_delay > 0))
+                {
+                    sleepLong(_delay / _batchSize);
+                }
+            }
         }
 
 
@@ -771,16 +808,38 @@
          */
         private boolean checkForSlowClients()
         {
-            if (_sent % _batchSize == 0)
+            // This will allways be true as we are running this at the end of 
each batchSize
+//            if (_sent % _batchSize == 0)
             {
                 // Cause test to pause when we have slow
                 if (!_slowClients.isEmpty() || NO_CLIENTS)
                 {
-                    debugLog.info("Pausing for slow clients:" + 
_slowClients.entrySet().toArray());
+
 
                     while (!_slowClients.isEmpty())
                     {
-                        debugLog.info(_slowClients.size() + " slow clients.");
+                        if (debugLog.isInfoEnabled()
+                            && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL 
== 0)
+                        {
+                            String clients = "";
+                            Iterator it = _slowClients.keySet().iterator();
+                            while (it.hasNext())
+                            {
+                                clients += it.next();
+                                if (it.hasNext())
+                                {
+                                    clients += ", ";
+                                }
+                            }
+                            debugLog.info("Pausing for slow clients:" + 
clients);
+                        }
+
+
+                        if (log.isDebugEnabled()
+                            && _sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+                        {
+                            log.debug(_slowClients.size() + " slow clients.");
+                        }
                         sleep(PAUSE_SLEEP);
                     }
 
@@ -794,7 +853,11 @@
                 }
                 else
                 {
-                    debugLog.info("Delay:" + _delay);
+                    if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+                    {
+                        log.info("Total Delay :" + _delay + " "
+                                 + (_delayShifting == 0 ? "Stablised" : "Not 
Stablised(" + _delayShifting + ")"));
+                    }
                 }
 
             }
@@ -825,7 +888,7 @@
          * Perform the sleep , swallowing any InteruptException.
          *
          * NOTE: If a sleep request is > 10s then reset only sleep for 5s
-         *  
+         *
          * @param milli to sleep for
          * @param nano sub miliseconds to sleep for
          */

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=diff&rev=551117&r1=551116&r2=551117
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
 Wed Jun 27 03:51:34 2007
@@ -113,6 +113,7 @@
         setPropertiesOnMessage(assignSender, testProperties);
         assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
         assignSender.setStringProperty("ROLE", "SENDER");
+        assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
 
         senderConversation.send(senderControlTopic, assignSender);
 
@@ -170,6 +171,7 @@
             setPropertiesOnMessage(assignReceiver, _testProperties);
             assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
             assignReceiver.setStringProperty("ROLE", "RECEIVER");
+            assignReceiver.setStringProperty("CLIENT_NAME", 
"Sustained_RECEIVER_" + receiver.clientName);
 
             receiverConversation.send(receiverControlTopic, assignReceiver);
 

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml?view=diff&rev=551117&r1=551116&r2=551117
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml
 Wed Jun 27 03:51:34 2007
@@ -35,11 +35,12 @@
     <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
 
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) 
- %m%n"/>
+            <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/>
+            <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} 
(%F:%L) - %m%n"/-->
         </layout>
     </appender>
 
-     <category name="SustainedTest">
+    <category name="SustainedTest">
         <priority value="${sustained.level}"/>
     </category>
 
@@ -48,8 +49,8 @@
     </category>
 
     <category name="org.apache.qpid.interop">
-         <priority value="${interop.logging.level}"/>
-     </category>
+        <priority value="${interop.logging.level}"/>
+    </category>
 
 
     <category name="org.apache.qpid.sustained">


Reply via email to