Author: rupertlssmith
Date: Thu Aug 16 03:18:08 2007
New Revision: 566644

URL: http://svn.apache.org/viewvc?view=rev&rev=566644
Log:
Added distributed clock synchronization using UDP datagrams.

Added:
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
Modified:
    
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
    incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java

Modified: 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
 (original)
+++ 
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
 Thu Aug 16 03:18:08 2007
@@ -29,8 +29,13 @@
 import org.apache.qpid.sustained.SustainedClientTestCase;
 import org.apache.qpid.test.framework.MessagingTestConfigProperties;
 import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.test.framework.clocksynch.ClockSynchThread;
+import org.apache.qpid.test.framework.clocksynch.ClockSynchronizer;
+import org.apache.qpid.test.framework.clocksynch.UDPClockSynchronizer;
 import org.apache.qpid.test.framework.distributedcircuit.TestClientCircuitEnd;
 
+import uk.co.thebadgerset.junit.extensions.SleepThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
 
@@ -41,7 +46,7 @@
 /**
  * Implements a test client as described in the interop testing spec
  * 
(http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification).
 A test client is an agent that
- * reacts to control message sequences send by the test [EMAIL PROTECTED] 
org.apache.qpid.test.framework.distributedtesting.Coordinator}.
+ * reacts to control message sequences send by the test [EMAIL PROTECTED] 
Coordinator}.
  *
  * <p/><table><caption>Messages Handled by SustainedTestClient</caption>
  * <tr><th> Message               <th> Action
@@ -51,6 +56,7 @@
  * <tr><td> Start                 <td> Send test messages defined by test 
parameters. Send report on messages sent.
  * <tr><td> Status Request        <td> Send report on messages received.
  * <tr><td> Terminate             <td> Terminate the test client.
+ * <tr><td> ClockSynch            <td> Synch clock against the supplied UDP 
address.
  * </table>
  *
  * <p><table id="crc"><caption>CRC Card</caption>
@@ -104,6 +110,9 @@
     /** This flag indicates that the test client should attempt to join the 
currently running test case on start up. */
     protected boolean join;
 
+    /** Holds the clock synchronizer for the test node. */
+    ClockSynchThread clockSynchThread;
+
     /**
      * Creates a new interop test client, listenting to the specified broker 
and virtual host, with the specified client
      * identifying name.
@@ -404,11 +413,30 @@
             }
             else if ("TERMINATE".equals(controlType))
             {
-                log.info("Received termination instruction from coordinator.");
+                console.info("Received termination instruction from 
coordinator.");
 
                 // Is a cleaner shutdown needed?
                 connection.close();
                 System.exit(0);
+            }
+            else if ("CLOCK_SYNCH".equals(controlType))
+            {
+                log.debug("Received clock synch command.");
+                String address = message.getStringProperty("ADDRESS");
+
+                log.debug("address = " + address);
+
+                // Re-create (if necessary) and start the clock synch thread 
to synch the clock every ten seconds.
+                if (clockSynchThread != null)
+                {
+                    clockSynchThread.terminate();
+                }
+
+                SleepThrottle throttle = new SleepThrottle();
+                throttle.setRate(0.1f);
+
+                clockSynchThread = new ClockSynchThread(new 
UDPClockSynchronizer(address), throttle);
+                clockSynchThread.start();
             }
             else
             {

Modified: incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh (original)
+++ incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh Thu Aug 16 
03:18:08 2007
@@ -19,5 +19,5 @@
 #
 for x in `seq 1 $1`;
 do
-    java -cp 
qpid-integrationtests-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar 
-Dlog4j.configuration=file:/home/rupert/qpid/trunk/qpid/java/etc/mylog4j.xml 
org.apache.qpid.test.framework.distributedtesting.TestClient -n $x &
+    java -cp 
qpid-integrationtests-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar 
-Dlog4j.configuration=file:/home/rupert/qpid/trunk/qpid/java/etc/mylog4j.xml 
org.apache.qpid.test.framework.distributedtesting.TestClient -n java$x &
 done

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
 Thu Aug 16 03:18:08 2007
@@ -54,7 +54,7 @@
      */
     public void onException(JMSException e)
     {
-        log.debug("public void onException(JMSException e): called");
+        log.debug("public void onException(JMSException e): called", e);
 
         exceptions.add(e);
     }

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
 Thu Aug 16 03:18:08 2007
@@ -57,7 +57,7 @@
      */
     public void onMessage(Message message)
     {
-        log.debug("public void onMessage(Message message): called");
+        // log.debug("public void onMessage(Message message): called");
 
         numMessages.getAndIncrement();
     }

Added: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java?view=auto&rev=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
 (added)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
 Thu Aug 16 03:18:08 2007
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+/**
+ * ClockSynchFailureException represents failure of a [EMAIL PROTECTED] 
ClockSynchronizer} to achieve synchronization. This could
+ * be because a reference signal is not available, or because a desired 
accurracy cannot be attained, for example.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to achieve synchronization.
+ * </table>
+ */
+public class ClockSynchFailureException extends Exception
+{
+    /**
+     * Creates a clock synch failure exception.
+     *
+     * @param message The detail message (which is saved for later retrieval 
by the [EMAIL PROTECTED] #getMessage()} method).
+     * @param cause   The cause (which is saved for later retrieval by the 
[EMAIL PROTECTED] #getCause()} method).  (A <tt>null</tt>
+     *                value is permitted, and indicates that the cause is 
nonexistent or unknown.)
+     */
+    public ClockSynchFailureException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}

Added: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java?view=auto&rev=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
 (added)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
 Thu Aug 16 03:18:08 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.ShutdownHookable;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
+/**
+ * ClockSynchThread is a convenient utility for running a thread that 
periodically synchronizes the clock against
+ * a reference. Supply it with a [EMAIL PROTECTED] ClockSynchronizer} and a 
[EMAIL PROTECTED] Throttle} and it will continually keep the
+ * clock up-to-date at a rate determined by the throttle.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Continually sychronize the clock at a throttled rate.
+ * </table>
+ */
+public class ClockSynchThread extends Thread implements ShutdownHookable
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(ClockSynchThread.class);
+
+    /** Holds the clock syncher for the synch thread. */
+    private ClockSynchronizer clockSyncher;
+
+    /** Holds the throttle to limit the synch rate. */
+    private Throttle throttle;
+
+    /** Flag to indicate that the periodic clock syncher should keep running. 
*/
+    boolean doSynch = true;
+
+    /**
+     * Creates a clock synchronizer thread from a clock synchronizer and a 
throttle.
+     *
+     * @param syncher  The clock synchronizer.
+     * @param throttle The throttle.
+     */
+    public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle)
+    {
+        this.clockSyncher = syncher;
+        this.throttle = throttle;
+    }
+
+    /**
+     * Terminates the synchronization thread.
+     */
+    public void terminate()
+    {
+        doSynch = false;
+    }
+
+    /**
+     * Continually updates the clock, until [EMAIL PROTECTED] #terminate()} is 
called.
+     */
+    public void run()
+    {
+        while (doSynch)
+        {
+            // Perform a clock clockSynch.
+            try
+            {
+                // Wait controlled by the throttle before doing the next synch.
+                throttle.throttle();
+
+                clockSyncher.synch();
+                log.debug("Clock synched, delta = " + clockSyncher.getDelta() 
+ ", epsilon = " + clockSyncher.getEpsilon()
+                    + ".");
+            }
+            // Terminate the synch thread if the synchronization cannot be 
achieved.
+            catch (ClockSynchFailureException e)
+            {
+                log.debug("Cannot synchronize the clock (reference service may 
be down). Terminating the synch thread.");
+                doSynch = false;
+            }
+        }
+    }
+
+    /**
+     * Gets the clock synchronizer that is kept continually up to date.
+     *
+     * @return The clock synchronizer that is kept continually up to date.
+     */
+    public ClockSynchronizer getClockSyncher()
+    {
+        return clockSyncher;
+    }
+
+    /**
+     * Supplies a shutdown hook, that terminates the synching thread.
+     *
+     * @return The shut down hook.
+     */
+    public Thread getShutdownHook()
+    {
+        return new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        doSynch = false;
+                    }
+                });
+    }
+}

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
 Thu Aug 16 03:18:08 2007
@@ -41,8 +41,10 @@
 {
     /**
      * The slave side should call this to copute a clock delta with the 
reference.
+     *
+     * @throws ClockSynchFailureException If synchronization cannot be 
achieved.
      */
-    public void synch();
+    public void synch() throws ClockSynchFailureException;
 
     /**
      * Gets the clock delta in nano seconds.

Added: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java?view=auto&rev=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
 (added)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
 Thu Aug 16 03:18:08 2007
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+/**
+ * LocalClockSynchronizer is a fake [EMAIL PROTECTED] ClockSynchronizer} that 
simply calls System.nanoTime(). It exists so that
+ * the same tests can be run distributed or locally, taking timings against 
the ClockSynchronizer interface without
+ * being aware of how they are being run.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Supply the local clock with no delta.
+ * </table>
+ */
+public class LocalClockSynchronizer implements ClockSynchronizer
+{
+    /**
+     * The slave side should call this to copute a clock delta with the 
reference.
+     *
+     * @throws 
org.apache.qpid.test.framework.clocksynch.ClockSynchFailureException
+     *          If synchronization cannot be achieved.
+     */
+    public void synch() throws ClockSynchFailureException
+    { }
+
+    /**
+     * Gets the clock delta in nano seconds.
+     *
+     * @return The clock delta in nano seconds.
+     */
+    public long getDelta()
+    {
+        return 0L;
+    }
+
+    /**
+     * Gets an estimate of the clock error in nan seconds.
+     *
+     * @return An estimate of the clock error in nan seconds.
+     */
+    public long getEpsilon()
+    {
+        return 0L;
+    }
+
+    /**
+     * Gets the local clock time with any computed delta added in.
+     *
+     * @return The local clock time with any computed delta added in.
+     */
+    public long nanoTime()
+    {
+        return System.nanoTime();
+    }
+}

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
 Thu Aug 16 03:18:08 2007
@@ -20,10 +20,14 @@
  */
 package org.apache.qpid.test.framework.clocksynch;
 
+import org.apache.log4j.Logger;
+
 import java.io.IOException;
 import java.net.*;
 import java.nio.ByteBuffer;
 
+import uk.co.thebadgerset.junit.extensions.ShutdownHookable;
+
 /**
  * UDPClockReference supplies a refernce clock signal (generated from 
System.nanoTime()).
  *
@@ -36,8 +40,11 @@
  *
  * @todo Errors rethrown as runtimes, or silently terminate the service. Could 
add better error handling if needed.
  */
-public class UDPClockReference implements Runnable
+public class UDPClockReference implements Runnable, ShutdownHookable
 {
+    /** Used for debugging. */
+    // private static final Logger log = 
Logger.getLogger(UDPClockReference.class);
+
     /** Defines the timeout to use when polling the socket for time requests. 
*/
     private static final int TIMEOUT = 200;
 

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
 Thu Aug 16 03:18:08 2007
@@ -20,12 +20,15 @@
  */
 package org.apache.qpid.test.framework.clocksynch;
 
+import org.apache.log4j.Logger;
+
 import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 import java.io.IOException;
 import java.net.*;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 /**
  * UDPClockSynchronizer is a [EMAIL PROTECTED] ClockSynchronizer} that sends 
pings as UDP datagrams, and uses the following simple
@@ -43,7 +46,7 @@
  * <li>The Slave repeats steps 2 through 4, 15 more times.</li>
  * <li>The results of the packet receipts are accumulated and sorted in 
lowest-latency to highest-latency order. The
  *     median latency is determined by picking the mid-point sample from this 
ordered list.</li>
- * <li>All samples above approximately 1 standard-deviation from the median 
are discarded and the remaining samples
+ * <li>All samples outside 1 standard-deviation from the median are discarded 
and the remaining samples
  *     are averaged using an arithmetic mean.</li>
  * </ol>
  *
@@ -59,6 +62,12 @@
  */
 public class UDPClockSynchronizer implements ClockSynchronizer
 {
+    /** Used for debugging. */
+    // private static final Logger log = 
Logger.getLogger(UDPClockSynchronizer.class);
+
+    /** Defines the timeout to use when waiting for responses to time 
requests. */
+    private static final int TIMEOUT = 50;
+
     /** The clock delta. */
     private long delta = 0L;
 
@@ -71,6 +80,9 @@
     /** Holds the socket to communicate with the reference service over. */
     private DatagramSocket socket;
 
+    /** Used to control the shutdown in the main test loop. */
+    private static boolean doSynch = true;
+
     /**
      * Creates a clock synchronizer against the specified address for the 
reference.
      *
@@ -90,19 +102,26 @@
 
     /**
      * The slave side should call this to compute a clock delta with the 
reference.
+     *
+     * @throws ClockSynchFailureException If synchronization cannot be 
achieved, due to unavailability of the reference
+     *                                    time service.
      */
-    public void synch()
+    public void synch() throws ClockSynchFailureException
     {
         try
         {
             socket = new DatagramSocket();
+            socket.setSoTimeout(TIMEOUT);
 
             // Synchronize on a single ping, to get the clock into the right 
ball-park.
             synch(1);
 
-            // Synchronize on 15 pings for greater accuracy.
+            // Synchronize on 15 pings.
             synch(15);
 
+            // And again, for greater accuracy, on 31.
+            synch(31);
+
             socket.close();
         }
         catch (SocketException e)
@@ -115,9 +134,14 @@
      * Updates the synchronization delta by performing the specified number of 
reference clock requests.
      *
      * @param n The number of reference clock request cycles to perform.
+     *
+     * @throws ClockSynchFailureException If synchronization cannot be 
achieved, due to unavailability of the reference
+     *                                    time service.
      */
-    protected void synch(int n)
+    protected void synch(int n) throws ClockSynchFailureException
     {
+        // log.debug("protected void synch(int n = " + n + "): called");
+
         // Create an array of deltas by performing n reference pings.
         long[] delta = new long[n];
 
@@ -130,16 +154,23 @@
         long median = median(delta);
         long sd = standardDeviation(delta);
 
+        // log.debug("median = " + median);
+        // log.debug("sd = " + sd);
+
         long[] tempDeltas = new long[n];
         int count = 0;
 
         for (int i = 0; i < n; i++)
         {
-            if (delta[i] <= (median + sd))
+            if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd)))
             {
                 tempDeltas[count] = delta[i];
                 count++;
             }
+            else
+            {
+                // log.debug("Rejected: " + delta[i]);
+            }
         }
 
         System.arraycopy(tempDeltas, 0, delta, 0, count);
@@ -150,6 +181,8 @@
         // Estimate the error as the standard deviation of the remaining 
deltas.
         this.epsilon = standardDeviation(delta);
 
+        // log.debug("this.delta = " + this.delta);
+        // log.debug("this.epsilon = " + this.epsilon);
     }
 
     /**
@@ -157,31 +190,74 @@
      * This is computed as the half-latency of the requst cycle, plus the 
reference clock, minus the local clock.
      *
      * @return The estimated clock delta.
+     *
+     * @throws ClockSynchFailureException If the reference service is not 
responding.
      */
-    protected long ping()
+    protected long ping() throws ClockSynchFailureException
     {
+        // log.debug("protected long ping(): called");
+
         try
         {
             byte[] buf = new byte[256];
-            DatagramPacket packet = new DatagramPacket(buf, buf.length, 
referenceAddress, UDPClockReference.REFERENCE_PORT);
 
-            // Start timing the request latency.
-            long start = nanoTime();
+            boolean timedOut = false;
+            long start = 0L;
+            long refTime = 0L;
+            long localTime = 0L;
+            long latency = 0L;
+            int failCount = 0;
+
+            // Keep trying the ping until it gets a response, or 10 tries in a 
row all time out.
+            do
+            {
+                // Start timing the request latency.
+                start = nanoTime();
+
+                // Get the reference time.
+                DatagramPacket packet =
+                    new DatagramPacket(buf, buf.length, referenceAddress, 
UDPClockReference.REFERENCE_PORT);
+                socket.send(packet);
+                packet = new DatagramPacket(buf, buf.length);
+
+                timedOut = false;
+
+                try
+                {
+                    socket.receive(packet);
+                }
+                catch (SocketTimeoutException e)
+                {
+                    timedOut = true;
+                    failCount++;
+
+                    continue;
+                }
+
+                ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
+                refTime = bbuf.getLong();
+
+                // Stop timing the request latency.
+                localTime = nanoTime();
+                latency = localTime - start;
+
+                // log.debug("refTime = " + refTime);
+                // log.debug("localTime = " + localTime);
+                // log.debug("start = " + start);
+                // log.debug("latency = " + latency);
+                // log.debug("delta = " + ((latency / 2) + (refTime - 
localTime)));
 
-            // Get the reference time.
-            socket.send(packet);
-            packet = new DatagramPacket(buf, buf.length);
-            socket.receive(packet);
-
-            ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
-            long refTime = bbuf.getLong();
-
-            // Stop timing the request latency.
-            long localTime = nanoTime();
-            long end = localTime - start;
+            }
+            while (timedOut && (failCount < 10));
+
+            // Fail completely if the fail count is too high.
+            if (failCount >= 10)
+            {
+                throw new ClockSynchFailureException("Clock reference not 
responding.", null);
+            }
 
             // Estimate delta as (ref clock + half-latency) - local clock.
-            return ((end - start) / 2) + refTime - localTime;
+            return (latency / 2) + (refTime - localTime);
         }
         catch (IOException e)
         {
@@ -228,18 +304,31 @@
      */
     public static long median(long[] values)
     {
+        // log.debug("public static long median(long[] values = " + 
Arrays.toString(values) + "): called");
+
+        long median;
+
+        // Order the list of values.
+        long[] orderedValues = new long[values.length];
+        System.arraycopy(values, 0, orderedValues, 0, values.length);
+        Arrays.sort(orderedValues);
+
         // Check if the median is computed from a pair of middle value.
-        if ((values.length % 2) == 0)
+        if ((orderedValues.length % 2) == 0)
         {
-            int middle = values.length / 2;
+            int middle = orderedValues.length / 2;
 
-            return (values[middle] + values[middle - 1]) / 2;
+            median = (orderedValues[middle] + orderedValues[middle - 1]) / 2;
         }
         // The median is computed from a single middle value.
         else
         {
-            return values[values.length / 2];
+            median = orderedValues[orderedValues.length / 2];
         }
+
+        // log.debug("median = " + median);
+
+        return median;
     }
 
     /**
@@ -251,6 +340,8 @@
      */
     public static long mean(long[] values)
     {
+        // log.debug("public static long mean(long[] values = " + 
Arrays.toString(values) + "): called");
+
         long total = 0L;
 
         for (long value : values)
@@ -258,7 +349,11 @@
             total += value;
         }
 
-        return total / values.length;
+        long mean = total / values.length;
+
+        // log.debug("mean = " + mean);
+
+        return mean;
     }
 
     /**
@@ -270,16 +365,23 @@
      */
     public static long variance(long[] values)
     {
+        // log.debug("public static long variance(long[] values = " + 
Arrays.toString(values) + "): called");
+
         long mean = mean(values);
 
         long totalVariance = 0;
 
         for (long value : values)
         {
-            totalVariance += (value - mean) ^ 2;
+            long diff = (value - mean);
+            totalVariance += diff * diff;
         }
 
-        return totalVariance / values.length;
+        long variance = totalVariance / values.length;
+
+        // log.debug("variance = " + variance);
+
+        return variance;
     }
 
     /**
@@ -291,7 +393,13 @@
      */
     public static long standardDeviation(long[] values)
     {
-        return Double.valueOf(Math.sqrt(variance(values))).longValue();
+        // log.debug("public static long standardDeviation(long[] values = " + 
Arrays.toString(values) + "): called");
+
+        long sd = Double.valueOf(Math.sqrt(variance(values))).longValue();
+
+        // log.debug("sd = " + sd);
+
+        return sd;
     }
 
     /**
@@ -314,11 +422,43 @@
         // Create a clock synchronizer.
         UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address);
 
-        // Perform a clock clockSyncher.
-        clockSyncher.synch();
+        // Set up a shutdown hook for it.
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        doSynch = false;
+                    }
+                }));
+
+        // Repeat the clock synching until the user kills the progam.
+        while (doSynch)
+        {
+            // Perform a clock clockSynch.
+            try
+            {
+                clockSyncher.synch();
 
-        // Print out the clock delta and estimate of the error.
-        System.out.println("Delta = " + clockSyncher.getDelta());
-        System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+                // Print out the clock delta and estimate of the error.
+                System.out.println("Delta = " + clockSyncher.getDelta());
+                System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+
+                try
+                {
+                    Thread.sleep(250);
+                }
+                catch (InterruptedException e)
+                {
+                    // Restore the interrupted status and terminate the loop.
+                    Thread.currentThread().interrupt();
+                    doSynch = false;
+                }
+            }
+            // Terminate if the reference time service is unavailable.
+            catch (ClockSynchFailureException e)
+            {
+                doSynch = false;
+            }
+        }
     }
 }

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
 Thu Aug 16 03:18:08 2007
@@ -63,15 +63,12 @@
  *       to register results and timings for the test. This must work in such 
a way that a new test cycle can be started
  *       without waiting for the results of the old one to come in.
  *
- * @todo Test circuits to be created per test thread, not per test method 
call. Per-thread setup and tear down to be
- *       reposible for circuit creation and clean up. Many individual test 
method calls to run over the same circuit.
- *       Important, otherwise test results will be skewed by circuit creation 
overheads.
- *
+ * @todo Add in setting of timing controller, from timing aware test cases.
  */
 public class DistributedCircuitImpl implements Circuit, TimingControllerAware
 {
     /** Used for debugging purposes. */
-    private static Logger log = Logger.getLogger(DistributedCircuitImpl.class);
+    private static final Logger log = 
Logger.getLogger(DistributedCircuitImpl.class);
 
     /** Holds the conversation factory over which to coordinate the test. */
     protected ConversationFactory conversationFactory;
@@ -305,7 +302,8 @@
      *       differ though, as the final report is used to apply assertions, 
and the ongoing report is just for
      *       periodic timing results... In which case, maybe there needs to be 
a way for the onMessage method
      *       to process just some of the incoming messages, and forward the 
rest on to the conversion helper, as
-     *       a sort of pre-conversation helper filter?
+     *       a sort of pre-conversation helper filter? Make conversation 
expose its onMessage method (it should
+     *       already) and allow another delivery thread to filter the incoming 
messages to the conversation.
      */
     public void check()
     {
@@ -365,7 +363,7 @@
                                 // Apply receiver assertions to pass/fail the 
tests.
 
                                 // Log the test timings on the asynchronous 
test timing controller.
-                                try
+                                /*try
                                 {
                                     timingController.completeTest(true, 
messageCount, testTime);
                                 }
@@ -374,7 +372,7 @@
                                 catch (InterruptedException e)
                                 {
                                     e.printStackTrace();
-                                }
+                                }*/
                             }
 
                             log.debug("All receiver test reports received.");

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
 Thu Aug 16 03:18:08 2007
@@ -54,7 +54,7 @@
 public class TestClientCircuitEnd implements CircuitEnd, 
TestClientControlledTest
 {
     /** Used for debugging. */
-    Logger log = Logger.getLogger(TestClientCircuitEnd.class);
+    private static final Logger log = 
Logger.getLogger(TestClientCircuitEnd.class);
 
     /** Holds the test parameters. */
     ParsedProperties testProps;

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
 Thu Aug 16 03:18:08 2007
@@ -31,6 +31,7 @@
 import org.apache.qpid.test.framework.MessagingTestConfigProperties;
 import org.apache.qpid.test.framework.TestClientDetails;
 import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.test.framework.clocksynch.UDPClockReference;
 import org.apache.qpid.test.framework.listeners.XMLTestListener;
 import org.apache.qpid.util.ConversationFactory;
 import org.apache.qpid.util.PrettyPrintingUtils;
@@ -47,6 +48,7 @@
 import javax.jms.*;
 
 import java.io.*;
+import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -219,6 +221,10 @@
                                 { "-csv", "Output test results in CSV 
format.", null, "false" },
                                 { "-xml", "Output test results in XML 
format.", null, "false" },
                                 {
+                                    "-trefaddr", "To specify an alternative to 
hostname for time singal reference.",
+                                    "address", "false"
+                                },
+                                {
                                     "c", "The number of tests to run 
concurrently.", "num", "false",
                                     MathUtils.SEQUENCE_REGEXP
                                 },
@@ -303,7 +309,7 @@
             if (testCaseClasses.isEmpty())
             {
                 throw new RuntimeException(
-                    "No test cases implementing DistributedTestCase were 
specified on the command line.");
+                    "No test cases implementing FrameworkBaseCase were 
specified on the command line.");
             }
 
             // Extract the names of all the test classes, to pass to the start 
method.
@@ -382,6 +388,21 @@
             log.debug("Got enlisted test client: " + client);
             console.info("Test node " + client.clientName + " available.");
         }
+
+        // Start the clock reference service running.
+        UDPClockReference clockReference = new UDPClockReference();
+        Thread clockRefThread = new Thread(clockReference);
+        registerShutdownHook(clockReference);
+        clockRefThread.start();
+
+        // Broadcast to all clients to synchronize their clocks against the 
coordinators clock reference.
+        Message clockSynchRequest = session.createMessage();
+        clockSynchRequest.setStringProperty("CONTROL_TYPE", "CLOCK_SYNCH");
+
+        String localAddress = 
InetAddress.getByName(InetAddress.getLocalHost().getHostName()).getHostAddress();
+        clockSynchRequest.setStringProperty("ADDRESS", localAddress);
+
+        conversation.send(controlTopic, clockSynchRequest);
 
         // Run the test in the suite using JUnit.
         TestResult result = null;

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
 Thu Aug 16 03:18:08 2007
@@ -83,6 +83,44 @@
         allClients = availableClients;
         conversationFactory = controlConversation;
         connection = controlConnection;
+
+        // Sign available clients up to the test.
+        for (Test test : getAllUnderlyingTests())
+        {
+            FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
+
+            // Get all of the clients able to participate in the test.
+            Set<TestClientDetails> enlists = signupClients(coordTest);
+
+            // Check that there were some clients available.
+            if (enlists.size() == 0)
+            {
+                throw new RuntimeException("No clients to test with");
+            }
+
+            // Create a distributed test circuit factory for the test.
+            CircuitFactory circuitFactory = getTestSequencer();
+
+            // Set up the first client in the sender role, and the remainder 
in the receivers role.
+            Iterator<TestClientDetails> clients = enlists.iterator();
+            circuitFactory.setSender(clients.next());
+
+            while (clients.hasNext())
+            {
+                // Set the sending and receiving client details on the test 
case.
+                circuitFactory.setReceiver(clients.next());
+            }
+
+            // Pass down the connection to hold the coordinating conversation 
over.
+            circuitFactory.setConversationFactory(conversationFactory);
+
+            // If the current test case is a drop-in test, set it up as the 
currently running test for late joiners to
+            // add in to. Otherwise the current test field is set to null, to 
indicate that late joiners are not allowed.
+            currentTest = (coordTest instanceof DropInTest) ? coordTest : null;
+
+            // Execute the test case.
+            coordTest.setCircuitFactory(circuitFactory);
+        }
     }
 
     /**
@@ -100,8 +138,6 @@
     {
         log.debug("public void run(TestResult testResult): called");
 
-        Collection<Test> tests = testSuite.getAllUnderlyingTests();
-
         // Listen for late joiners on the control topic.
         try
         {
@@ -113,7 +149,7 @@
         }
 
         // Run all of the test cases in the test suite.
-        for (Test test : getAllUnderlyingTests())
+        /*for (Test test : getAllUnderlyingTests())
         {
             FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
 
@@ -148,6 +184,13 @@
 
             // Execute the test case.
             coordTest.setCircuitFactory(circuitFactory);
+        }*/
+
+        // Run all of the test cases in the test suite.
+        for (Test test : getAllUnderlyingTests())
+        {
+            FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
+
             coordTest.run(testResult);
 
             currentTest = null;


Reply via email to