Author: rupertlssmith
Date: Thu Aug 16 03:18:08 2007
New Revision: 566644
URL: http://svn.apache.org/viewvc?view=rev&rev=566644
Log:
Added distributed clock synchronization using UDP datagrams.
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
Modified:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
Modified:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
(original)
+++
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/test/framework/distributedtesting/TestClient.java
Thu Aug 16 03:18:08 2007
@@ -29,8 +29,13 @@
import org.apache.qpid.sustained.SustainedClientTestCase;
import org.apache.qpid.test.framework.MessagingTestConfigProperties;
import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.test.framework.clocksynch.ClockSynchThread;
+import org.apache.qpid.test.framework.clocksynch.ClockSynchronizer;
+import org.apache.qpid.test.framework.clocksynch.UDPClockSynchronizer;
import org.apache.qpid.test.framework.distributedcircuit.TestClientCircuitEnd;
+import uk.co.thebadgerset.junit.extensions.SleepThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
@@ -41,7 +46,7 @@
/**
* Implements a test client as described in the interop testing spec
*
(http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification).
A test client is an agent that
- * reacts to control message sequences send by the test [EMAIL PROTECTED]
org.apache.qpid.test.framework.distributedtesting.Coordinator}.
+ * reacts to control message sequences send by the test [EMAIL PROTECTED]
Coordinator}.
*
* <p/><table><caption>Messages Handled by SustainedTestClient</caption>
* <tr><th> Message <th> Action
@@ -51,6 +56,7 @@
* <tr><td> Start <td> Send test messages defined by test
parameters. Send report on messages sent.
* <tr><td> Status Request <td> Send report on messages received.
* <tr><td> Terminate <td> Terminate the test client.
+ * <tr><td> ClockSynch <td> Synch clock against the supplied UDP
address.
* </table>
*
* <p><table id="crc"><caption>CRC Card</caption>
@@ -104,6 +110,9 @@
/** This flag indicates that the test client should attempt to join the
currently running test case on start up. */
protected boolean join;
+ /** Holds the clock synchronizer for the test node. */
+ ClockSynchThread clockSynchThread;
+
/**
* Creates a new interop test client, listenting to the specified broker
and virtual host, with the specified client
* identifying name.
@@ -404,11 +413,30 @@
}
else if ("TERMINATE".equals(controlType))
{
- log.info("Received termination instruction from coordinator.");
+ console.info("Received termination instruction from
coordinator.");
// Is a cleaner shutdown needed?
connection.close();
System.exit(0);
+ }
+ else if ("CLOCK_SYNCH".equals(controlType))
+ {
+ log.debug("Received clock synch command.");
+ String address = message.getStringProperty("ADDRESS");
+
+ log.debug("address = " + address);
+
+ // Re-create (if necessary) and start the clock synch thread
to synch the clock every ten seconds.
+ if (clockSynchThread != null)
+ {
+ clockSynchThread.terminate();
+ }
+
+ SleepThrottle throttle = new SleepThrottle();
+ throttle.setRate(0.1f);
+
+ clockSynchThread = new ClockSynchThread(new
UDPClockSynchronizer(address), throttle);
+ clockSynchThread.start();
}
else
{
Modified: incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
--- incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh (original)
+++ incubator/qpid/branches/M2/java/systests/etc/bin/testclients.sh Thu Aug 16
03:18:08 2007
@@ -19,5 +19,5 @@
#
for x in `seq 1 $1`;
do
- java -cp
qpid-integrationtests-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar
-Dlog4j.configuration=file:/home/rupert/qpid/trunk/qpid/java/etc/mylog4j.xml
org.apache.qpid.test.framework.distributedtesting.TestClient -n $x &
+ java -cp
qpid-integrationtests-1.0-incubating-M2-SNAPSHOT-all-test-deps.jar
-Dlog4j.configuration=file:/home/rupert/qpid/trunk/qpid/java/etc/mylog4j.xml
org.apache.qpid.test.framework.distributedtesting.TestClient -n java$x &
done
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
Thu Aug 16 03:18:08 2007
@@ -54,7 +54,7 @@
*/
public void onException(JMSException e)
{
- log.debug("public void onException(JMSException e): called");
+ log.debug("public void onException(JMSException e): called", e);
exceptions.add(e);
}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
Thu Aug 16 03:18:08 2007
@@ -57,7 +57,7 @@
*/
public void onMessage(Message message)
{
- log.debug("public void onMessage(Message message): called");
+ // log.debug("public void onMessage(Message message): called");
numMessages.getAndIncrement();
}
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java?view=auto&rev=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
(added)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchFailureException.java
Thu Aug 16 03:18:08 2007
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+/**
+ * ClockSynchFailureException represents failure of a [EMAIL PROTECTED]
ClockSynchronizer} to achieve synchronization. This could
+ * be because a reference signal is not available, or because a desired
accurracy cannot be attained, for example.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent failure to achieve synchronization.
+ * </table>
+ */
+public class ClockSynchFailureException extends Exception
+{
+ /**
+ * Creates a clock synch failure exception.
+ *
+ * @param message The detail message (which is saved for later retrieval
by the [EMAIL PROTECTED] #getMessage()} method).
+ * @param cause The cause (which is saved for later retrieval by the
[EMAIL PROTECTED] #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is
nonexistent or unknown.)
+ */
+ public ClockSynchFailureException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java?view=auto&rev=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
(added)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchThread.java
Thu Aug 16 03:18:08 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.ShutdownHookable;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
+/**
+ * ClockSynchThread is a convenient utility for running a thread that
periodically synchronizes the clock against
+ * a reference. Supply it with a [EMAIL PROTECTED] ClockSynchronizer} and a
[EMAIL PROTECTED] Throttle} and it will continually keep the
+ * clock up-to-date at a rate determined by the throttle.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Continually sychronize the clock at a throttled rate.
+ * </table>
+ */
+public class ClockSynchThread extends Thread implements ShutdownHookable
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ClockSynchThread.class);
+
+ /** Holds the clock syncher for the synch thread. */
+ private ClockSynchronizer clockSyncher;
+
+ /** Holds the throttle to limit the synch rate. */
+ private Throttle throttle;
+
+ /** Flag to indicate that the periodic clock syncher should keep running.
*/
+ boolean doSynch = true;
+
+ /**
+ * Creates a clock synchronizer thread from a clock synchronizer and a
throttle.
+ *
+ * @param syncher The clock synchronizer.
+ * @param throttle The throttle.
+ */
+ public ClockSynchThread(ClockSynchronizer syncher, Throttle throttle)
+ {
+ this.clockSyncher = syncher;
+ this.throttle = throttle;
+ }
+
+ /**
+ * Terminates the synchronization thread.
+ */
+ public void terminate()
+ {
+ doSynch = false;
+ }
+
+ /**
+ * Continually updates the clock, until [EMAIL PROTECTED] #terminate()} is
called.
+ */
+ public void run()
+ {
+ while (doSynch)
+ {
+ // Perform a clock clockSynch.
+ try
+ {
+ // Wait controlled by the throttle before doing the next synch.
+ throttle.throttle();
+
+ clockSyncher.synch();
+ log.debug("Clock synched, delta = " + clockSyncher.getDelta()
+ ", epsilon = " + clockSyncher.getEpsilon()
+ + ".");
+ }
+ // Terminate the synch thread if the synchronization cannot be
achieved.
+ catch (ClockSynchFailureException e)
+ {
+ log.debug("Cannot synchronize the clock (reference service may
be down). Terminating the synch thread.");
+ doSynch = false;
+ }
+ }
+ }
+
+ /**
+ * Gets the clock synchronizer that is kept continually up to date.
+ *
+ * @return The clock synchronizer that is kept continually up to date.
+ */
+ public ClockSynchronizer getClockSyncher()
+ {
+ return clockSyncher;
+ }
+
+ /**
+ * Supplies a shutdown hook, that terminates the synching thread.
+ *
+ * @return The shut down hook.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ doSynch = false;
+ }
+ });
+ }
+}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/ClockSynchronizer.java
Thu Aug 16 03:18:08 2007
@@ -41,8 +41,10 @@
{
/**
* The slave side should call this to copute a clock delta with the
reference.
+ *
+ * @throws ClockSynchFailureException If synchronization cannot be
achieved.
*/
- public void synch();
+ public void synch() throws ClockSynchFailureException;
/**
* Gets the clock delta in nano seconds.
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java?view=auto&rev=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
(added)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/LocalClockSynchronizer.java
Thu Aug 16 03:18:08 2007
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.clocksynch;
+
+/**
+ * LocalClockSynchronizer is a fake [EMAIL PROTECTED] ClockSynchronizer} that
simply calls System.nanoTime(). It exists so that
+ * the same tests can be run distributed or locally, taking timings against
the ClockSynchronizer interface without
+ * being aware of how they are being run.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Supply the local clock with no delta.
+ * </table>
+ */
+public class LocalClockSynchronizer implements ClockSynchronizer
+{
+ /**
+ * The slave side should call this to copute a clock delta with the
reference.
+ *
+ * @throws
org.apache.qpid.test.framework.clocksynch.ClockSynchFailureException
+ * If synchronization cannot be achieved.
+ */
+ public void synch() throws ClockSynchFailureException
+ { }
+
+ /**
+ * Gets the clock delta in nano seconds.
+ *
+ * @return The clock delta in nano seconds.
+ */
+ public long getDelta()
+ {
+ return 0L;
+ }
+
+ /**
+ * Gets an estimate of the clock error in nan seconds.
+ *
+ * @return An estimate of the clock error in nan seconds.
+ */
+ public long getEpsilon()
+ {
+ return 0L;
+ }
+
+ /**
+ * Gets the local clock time with any computed delta added in.
+ *
+ * @return The local clock time with any computed delta added in.
+ */
+ public long nanoTime()
+ {
+ return System.nanoTime();
+ }
+}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockReference.java
Thu Aug 16 03:18:08 2007
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.test.framework.clocksynch;
+import org.apache.log4j.Logger;
+
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
+import uk.co.thebadgerset.junit.extensions.ShutdownHookable;
+
/**
* UDPClockReference supplies a refernce clock signal (generated from
System.nanoTime()).
*
@@ -36,8 +40,11 @@
*
* @todo Errors rethrown as runtimes, or silently terminate the service. Could
add better error handling if needed.
*/
-public class UDPClockReference implements Runnable
+public class UDPClockReference implements Runnable, ShutdownHookable
{
+ /** Used for debugging. */
+ // private static final Logger log =
Logger.getLogger(UDPClockReference.class);
+
/** Defines the timeout to use when polling the socket for time requests.
*/
private static final int TIMEOUT = 200;
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/clocksynch/UDPClockSynchronizer.java
Thu Aug 16 03:18:08 2007
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.test.framework.clocksynch;
+import org.apache.log4j.Logger;
+
import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
+import java.util.Arrays;
/**
* UDPClockSynchronizer is a [EMAIL PROTECTED] ClockSynchronizer} that sends
pings as UDP datagrams, and uses the following simple
@@ -43,7 +46,7 @@
* <li>The Slave repeats steps 2 through 4, 15 more times.</li>
* <li>The results of the packet receipts are accumulated and sorted in
lowest-latency to highest-latency order. The
* median latency is determined by picking the mid-point sample from this
ordered list.</li>
- * <li>All samples above approximately 1 standard-deviation from the median
are discarded and the remaining samples
+ * <li>All samples outside 1 standard-deviation from the median are discarded
and the remaining samples
* are averaged using an arithmetic mean.</li>
* </ol>
*
@@ -59,6 +62,12 @@
*/
public class UDPClockSynchronizer implements ClockSynchronizer
{
+ /** Used for debugging. */
+ // private static final Logger log =
Logger.getLogger(UDPClockSynchronizer.class);
+
+ /** Defines the timeout to use when waiting for responses to time
requests. */
+ private static final int TIMEOUT = 50;
+
/** The clock delta. */
private long delta = 0L;
@@ -71,6 +80,9 @@
/** Holds the socket to communicate with the reference service over. */
private DatagramSocket socket;
+ /** Used to control the shutdown in the main test loop. */
+ private static boolean doSynch = true;
+
/**
* Creates a clock synchronizer against the specified address for the
reference.
*
@@ -90,19 +102,26 @@
/**
* The slave side should call this to compute a clock delta with the
reference.
+ *
+ * @throws ClockSynchFailureException If synchronization cannot be
achieved, due to unavailability of the reference
+ * time service.
*/
- public void synch()
+ public void synch() throws ClockSynchFailureException
{
try
{
socket = new DatagramSocket();
+ socket.setSoTimeout(TIMEOUT);
// Synchronize on a single ping, to get the clock into the right
ball-park.
synch(1);
- // Synchronize on 15 pings for greater accuracy.
+ // Synchronize on 15 pings.
synch(15);
+ // And again, for greater accuracy, on 31.
+ synch(31);
+
socket.close();
}
catch (SocketException e)
@@ -115,9 +134,14 @@
* Updates the synchronization delta by performing the specified number of
reference clock requests.
*
* @param n The number of reference clock request cycles to perform.
+ *
+ * @throws ClockSynchFailureException If synchronization cannot be
achieved, due to unavailability of the reference
+ * time service.
*/
- protected void synch(int n)
+ protected void synch(int n) throws ClockSynchFailureException
{
+ // log.debug("protected void synch(int n = " + n + "): called");
+
// Create an array of deltas by performing n reference pings.
long[] delta = new long[n];
@@ -130,16 +154,23 @@
long median = median(delta);
long sd = standardDeviation(delta);
+ // log.debug("median = " + median);
+ // log.debug("sd = " + sd);
+
long[] tempDeltas = new long[n];
int count = 0;
for (int i = 0; i < n; i++)
{
- if (delta[i] <= (median + sd))
+ if ((delta[i] <= (median + sd)) && (delta[i] >= (median - sd)))
{
tempDeltas[count] = delta[i];
count++;
}
+ else
+ {
+ // log.debug("Rejected: " + delta[i]);
+ }
}
System.arraycopy(tempDeltas, 0, delta, 0, count);
@@ -150,6 +181,8 @@
// Estimate the error as the standard deviation of the remaining
deltas.
this.epsilon = standardDeviation(delta);
+ // log.debug("this.delta = " + this.delta);
+ // log.debug("this.epsilon = " + this.epsilon);
}
/**
@@ -157,31 +190,74 @@
* This is computed as the half-latency of the requst cycle, plus the
reference clock, minus the local clock.
*
* @return The estimated clock delta.
+ *
+ * @throws ClockSynchFailureException If the reference service is not
responding.
*/
- protected long ping()
+ protected long ping() throws ClockSynchFailureException
{
+ // log.debug("protected long ping(): called");
+
try
{
byte[] buf = new byte[256];
- DatagramPacket packet = new DatagramPacket(buf, buf.length,
referenceAddress, UDPClockReference.REFERENCE_PORT);
- // Start timing the request latency.
- long start = nanoTime();
+ boolean timedOut = false;
+ long start = 0L;
+ long refTime = 0L;
+ long localTime = 0L;
+ long latency = 0L;
+ int failCount = 0;
+
+ // Keep trying the ping until it gets a response, or 10 tries in a
row all time out.
+ do
+ {
+ // Start timing the request latency.
+ start = nanoTime();
+
+ // Get the reference time.
+ DatagramPacket packet =
+ new DatagramPacket(buf, buf.length, referenceAddress,
UDPClockReference.REFERENCE_PORT);
+ socket.send(packet);
+ packet = new DatagramPacket(buf, buf.length);
+
+ timedOut = false;
+
+ try
+ {
+ socket.receive(packet);
+ }
+ catch (SocketTimeoutException e)
+ {
+ timedOut = true;
+ failCount++;
+
+ continue;
+ }
+
+ ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
+ refTime = bbuf.getLong();
+
+ // Stop timing the request latency.
+ localTime = nanoTime();
+ latency = localTime - start;
+
+ // log.debug("refTime = " + refTime);
+ // log.debug("localTime = " + localTime);
+ // log.debug("start = " + start);
+ // log.debug("latency = " + latency);
+ // log.debug("delta = " + ((latency / 2) + (refTime -
localTime)));
- // Get the reference time.
- socket.send(packet);
- packet = new DatagramPacket(buf, buf.length);
- socket.receive(packet);
-
- ByteBuffer bbuf = ByteBuffer.wrap(packet.getData());
- long refTime = bbuf.getLong();
-
- // Stop timing the request latency.
- long localTime = nanoTime();
- long end = localTime - start;
+ }
+ while (timedOut && (failCount < 10));
+
+ // Fail completely if the fail count is too high.
+ if (failCount >= 10)
+ {
+ throw new ClockSynchFailureException("Clock reference not
responding.", null);
+ }
// Estimate delta as (ref clock + half-latency) - local clock.
- return ((end - start) / 2) + refTime - localTime;
+ return (latency / 2) + (refTime - localTime);
}
catch (IOException e)
{
@@ -228,18 +304,31 @@
*/
public static long median(long[] values)
{
+ // log.debug("public static long median(long[] values = " +
Arrays.toString(values) + "): called");
+
+ long median;
+
+ // Order the list of values.
+ long[] orderedValues = new long[values.length];
+ System.arraycopy(values, 0, orderedValues, 0, values.length);
+ Arrays.sort(orderedValues);
+
// Check if the median is computed from a pair of middle value.
- if ((values.length % 2) == 0)
+ if ((orderedValues.length % 2) == 0)
{
- int middle = values.length / 2;
+ int middle = orderedValues.length / 2;
- return (values[middle] + values[middle - 1]) / 2;
+ median = (orderedValues[middle] + orderedValues[middle - 1]) / 2;
}
// The median is computed from a single middle value.
else
{
- return values[values.length / 2];
+ median = orderedValues[orderedValues.length / 2];
}
+
+ // log.debug("median = " + median);
+
+ return median;
}
/**
@@ -251,6 +340,8 @@
*/
public static long mean(long[] values)
{
+ // log.debug("public static long mean(long[] values = " +
Arrays.toString(values) + "): called");
+
long total = 0L;
for (long value : values)
@@ -258,7 +349,11 @@
total += value;
}
- return total / values.length;
+ long mean = total / values.length;
+
+ // log.debug("mean = " + mean);
+
+ return mean;
}
/**
@@ -270,16 +365,23 @@
*/
public static long variance(long[] values)
{
+ // log.debug("public static long variance(long[] values = " +
Arrays.toString(values) + "): called");
+
long mean = mean(values);
long totalVariance = 0;
for (long value : values)
{
- totalVariance += (value - mean) ^ 2;
+ long diff = (value - mean);
+ totalVariance += diff * diff;
}
- return totalVariance / values.length;
+ long variance = totalVariance / values.length;
+
+ // log.debug("variance = " + variance);
+
+ return variance;
}
/**
@@ -291,7 +393,13 @@
*/
public static long standardDeviation(long[] values)
{
- return Double.valueOf(Math.sqrt(variance(values))).longValue();
+ // log.debug("public static long standardDeviation(long[] values = " +
Arrays.toString(values) + "): called");
+
+ long sd = Double.valueOf(Math.sqrt(variance(values))).longValue();
+
+ // log.debug("sd = " + sd);
+
+ return sd;
}
/**
@@ -314,11 +422,43 @@
// Create a clock synchronizer.
UDPClockSynchronizer clockSyncher = new UDPClockSynchronizer(address);
- // Perform a clock clockSyncher.
- clockSyncher.synch();
+ // Set up a shutdown hook for it.
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
+ {
+ public void run()
+ {
+ doSynch = false;
+ }
+ }));
+
+ // Repeat the clock synching until the user kills the progam.
+ while (doSynch)
+ {
+ // Perform a clock clockSynch.
+ try
+ {
+ clockSyncher.synch();
- // Print out the clock delta and estimate of the error.
- System.out.println("Delta = " + clockSyncher.getDelta());
- System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+ // Print out the clock delta and estimate of the error.
+ System.out.println("Delta = " + clockSyncher.getDelta());
+ System.out.println("Epsilon = " + clockSyncher.getEpsilon());
+
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status and terminate the loop.
+ Thread.currentThread().interrupt();
+ doSynch = false;
+ }
+ }
+ // Terminate if the reference time service is unavailable.
+ catch (ClockSynchFailureException e)
+ {
+ doSynch = false;
+ }
+ }
}
}
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
Thu Aug 16 03:18:08 2007
@@ -63,15 +63,12 @@
* to register results and timings for the test. This must work in such
a way that a new test cycle can be started
* without waiting for the results of the old one to come in.
*
- * @todo Test circuits to be created per test thread, not per test method
call. Per-thread setup and tear down to be
- * reposible for circuit creation and clean up. Many individual test
method calls to run over the same circuit.
- * Important, otherwise test results will be skewed by circuit creation
overheads.
- *
+ * @todo Add in setting of timing controller, from timing aware test cases.
*/
public class DistributedCircuitImpl implements Circuit, TimingControllerAware
{
/** Used for debugging purposes. */
- private static Logger log = Logger.getLogger(DistributedCircuitImpl.class);
+ private static final Logger log =
Logger.getLogger(DistributedCircuitImpl.class);
/** Holds the conversation factory over which to coordinate the test. */
protected ConversationFactory conversationFactory;
@@ -305,7 +302,8 @@
* differ though, as the final report is used to apply assertions,
and the ongoing report is just for
* periodic timing results... In which case, maybe there needs to be
a way for the onMessage method
* to process just some of the incoming messages, and forward the
rest on to the conversion helper, as
- * a sort of pre-conversation helper filter?
+ * a sort of pre-conversation helper filter? Make conversation
expose its onMessage method (it should
+ * already) and allow another delivery thread to filter the incoming
messages to the conversation.
*/
public void check()
{
@@ -365,7 +363,7 @@
// Apply receiver assertions to pass/fail the
tests.
// Log the test timings on the asynchronous
test timing controller.
- try
+ /*try
{
timingController.completeTest(true,
messageCount, testTime);
}
@@ -374,7 +372,7 @@
catch (InterruptedException e)
{
e.printStackTrace();
- }
+ }*/
}
log.debug("All receiver test reports received.");
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
Thu Aug 16 03:18:08 2007
@@ -54,7 +54,7 @@
public class TestClientCircuitEnd implements CircuitEnd,
TestClientControlledTest
{
/** Used for debugging. */
- Logger log = Logger.getLogger(TestClientCircuitEnd.class);
+ private static final Logger log =
Logger.getLogger(TestClientCircuitEnd.class);
/** Holds the test parameters. */
ParsedProperties testProps;
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
Thu Aug 16 03:18:08 2007
@@ -31,6 +31,7 @@
import org.apache.qpid.test.framework.MessagingTestConfigProperties;
import org.apache.qpid.test.framework.TestClientDetails;
import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.test.framework.clocksynch.UDPClockReference;
import org.apache.qpid.test.framework.listeners.XMLTestListener;
import org.apache.qpid.util.ConversationFactory;
import org.apache.qpid.util.PrettyPrintingUtils;
@@ -47,6 +48,7 @@
import javax.jms.*;
import java.io.*;
+import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
@@ -219,6 +221,10 @@
{ "-csv", "Output test results in CSV
format.", null, "false" },
{ "-xml", "Output test results in XML
format.", null, "false" },
{
+ "-trefaddr", "To specify an alternative to
hostname for time singal reference.",
+ "address", "false"
+ },
+ {
"c", "The number of tests to run
concurrently.", "num", "false",
MathUtils.SEQUENCE_REGEXP
},
@@ -303,7 +309,7 @@
if (testCaseClasses.isEmpty())
{
throw new RuntimeException(
- "No test cases implementing DistributedTestCase were
specified on the command line.");
+ "No test cases implementing FrameworkBaseCase were
specified on the command line.");
}
// Extract the names of all the test classes, to pass to the start
method.
@@ -382,6 +388,21 @@
log.debug("Got enlisted test client: " + client);
console.info("Test node " + client.clientName + " available.");
}
+
+ // Start the clock reference service running.
+ UDPClockReference clockReference = new UDPClockReference();
+ Thread clockRefThread = new Thread(clockReference);
+ registerShutdownHook(clockReference);
+ clockRefThread.start();
+
+ // Broadcast to all clients to synchronize their clocks against the
coordinators clock reference.
+ Message clockSynchRequest = session.createMessage();
+ clockSynchRequest.setStringProperty("CONTROL_TYPE", "CLOCK_SYNCH");
+
+ String localAddress =
InetAddress.getByName(InetAddress.getLocalHost().getHostName()).getHostAddress();
+ clockSynchRequest.setStringProperty("ADDRESS", localAddress);
+
+ conversation.send(controlTopic, clockSynchRequest);
// Run the test in the suite using JUnit.
TestResult result = null;
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java?view=diff&rev=566644&r1=566643&r2=566644
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/FanOutTestDecorator.java
Thu Aug 16 03:18:08 2007
@@ -83,6 +83,44 @@
allClients = availableClients;
conversationFactory = controlConversation;
connection = controlConnection;
+
+ // Sign available clients up to the test.
+ for (Test test : getAllUnderlyingTests())
+ {
+ FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
+
+ // Get all of the clients able to participate in the test.
+ Set<TestClientDetails> enlists = signupClients(coordTest);
+
+ // Check that there were some clients available.
+ if (enlists.size() == 0)
+ {
+ throw new RuntimeException("No clients to test with");
+ }
+
+ // Create a distributed test circuit factory for the test.
+ CircuitFactory circuitFactory = getTestSequencer();
+
+ // Set up the first client in the sender role, and the remainder
in the receivers role.
+ Iterator<TestClientDetails> clients = enlists.iterator();
+ circuitFactory.setSender(clients.next());
+
+ while (clients.hasNext())
+ {
+ // Set the sending and receiving client details on the test
case.
+ circuitFactory.setReceiver(clients.next());
+ }
+
+ // Pass down the connection to hold the coordinating conversation
over.
+ circuitFactory.setConversationFactory(conversationFactory);
+
+ // If the current test case is a drop-in test, set it up as the
currently running test for late joiners to
+ // add in to. Otherwise the current test field is set to null, to
indicate that late joiners are not allowed.
+ currentTest = (coordTest instanceof DropInTest) ? coordTest : null;
+
+ // Execute the test case.
+ coordTest.setCircuitFactory(circuitFactory);
+ }
}
/**
@@ -100,8 +138,6 @@
{
log.debug("public void run(TestResult testResult): called");
- Collection<Test> tests = testSuite.getAllUnderlyingTests();
-
// Listen for late joiners on the control topic.
try
{
@@ -113,7 +149,7 @@
}
// Run all of the test cases in the test suite.
- for (Test test : getAllUnderlyingTests())
+ /*for (Test test : getAllUnderlyingTests())
{
FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
@@ -148,6 +184,13 @@
// Execute the test case.
coordTest.setCircuitFactory(circuitFactory);
+ }*/
+
+ // Run all of the test cases in the test suite.
+ for (Test test : getAllUnderlyingTests())
+ {
+ FrameworkBaseCase coordTest = (FrameworkBaseCase) test;
+
coordTest.run(testResult);
currentTest = null;