Author: rajith
Date: Fri Nov 21 08:32:53 2008
New Revision: 719628
URL: http://svn.apache.org/viewvc?rev=719628&view=rev
Log:
This is related to QPID-1479.
For starters I have changed the IoSender.java IoReceiver.java and
AMQSession.java#Dispatcher to use the Thread factory to create the threads they
require.
The ThreadFactory has two implimentations, the default being the
java.lang.Threads.
The other is the RealtimeThreadFactory which uses reflection to create threads
with a specific priority.
-Dqpid.thread_factory=<thread_factory_class> will decide which thread factory
should be loaded.
-Dqpid.rt_thread_priority=<int> specifies the gloabl real time thread priority
and defaults to 20.
You could also set individual thread priorities by adding the nessacery
config+code changes.
I have also changed the Testkit and QpidBench to use the Thread factory so you
could use them for testing/benchmarking work on RT JVMs.
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri Nov 21 08:32:53 2008
@@ -67,16 +67,26 @@
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSObjectMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +281,8 @@
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
+
+ protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry _messageFactoryRegistry;
@@ -668,7 +680,7 @@
if (_dispatcher != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcher.interrupt();
+ _dispatcherThread.interrupt();
}
}
@@ -1852,7 +1864,7 @@
void startDispatcherIfNecessary()
{
//If we are the dispatcher then we don't need to check we are started
- if (Thread.currentThread() == _dispatcher)
+ if (Thread.currentThread() == _dispatcherThread)
{
return;
}
@@ -1883,9 +1895,23 @@
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
+ try
+ {
+ _dispatcherThread =
Threading.getThreadFactory().createThread(_dispatcher);
+
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating Dispatcher thread",e);
+ }
+ _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
+ _dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
- _dispatcher.start();
+ _dispatcherThread.start();
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(_dispatcherThread.getName() + "
created");
+ }
}
else
{
@@ -2606,7 +2632,7 @@
private static final Logger _dispatcherLogger =
LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
/** Responsible for decoding a message fragment and passing it to the
appropriate message consumer. */
- class Dispatcher extends Thread
+ class Dispatcher implements Runnable
{
/** Track the 'stopped' state of the dispatcher, a session starts in
the stopped state. */
@@ -2615,21 +2641,14 @@
private final Object _lock = new Object();
private String dispatcherID = "" + System.identityHashCode(this);
-
-
public Dispatcher()
{
- super("Dispatcher-Channel-" + _channelId);
- if (_dispatcherLogger.isInfoEnabled())
- {
- _dispatcherLogger.info(getName() + " created");
- }
}
public void close()
{
_closed.set(true);
- interrupt();
+ _dispatcherThread.interrupt();
// fixme awaitTermination
@@ -2708,7 +2727,7 @@
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " started");
+ _dispatcherLogger.info(_dispatcherThread.getName() + "
started");
}
UnprocessedMessage message;
@@ -2771,7 +2790,7 @@
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(getName() + " thread terminating for
channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread
terminating for channel " + _channelId);
}
}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java?rev=719628&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/DefaultThreadFactory.java
Fri Nov 21 08:32:53 2008
@@ -0,0 +1,18 @@
+package org.apache.qpid.thread;
+
+public class DefaultThreadFactory implements ThreadFactory
+{
+
+ public Thread createThread(Runnable r)
+ {
+ return new Thread(r);
+ }
+
+ public Thread createThread(Runnable r, int priority)
+ {
+ Thread t = new Thread(r);
+ t.setPriority(priority);
+ return t;
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java?rev=719628&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
Fri Nov 21 08:32:53 2008
@@ -0,0 +1,47 @@
+package org.apache.qpid.thread;
+
+import java.lang.reflect.Constructor;
+
+public class RealtimeThreadFactory implements ThreadFactory
+{
+ private Class threadClass;
+ private Constructor threadConstructor;
+ private Constructor priorityParameterConstructor;
+ private int defaultRTThreadPriority = 20;
+
+ public RealtimeThreadFactory() throws Exception
+ {
+ defaultRTThreadPriority =
Integer.getInteger("qpid.rt_thread_priority",20);
+ threadClass = Class.forName("javax.realtime.RealtimeThread");
+
+ Class schedulingParametersClass =
Class.forName("javax.realtime.SchedulingParameters");
+ Class releaseParametersClass =
Class.forName("javax.realtime.ReleaseParameters");
+ Class memoryParametersClass =
Class.forName("javax.realtime.MemoryParameters");
+ Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+ Class processingGroupParametersClass =
Class.forName("javax.realtime.ProcessingGroupParameters");
+
+ Class[] paramTypes = new Class[]{schedulingParametersClass,
+ releaseParametersClass,
+ memoryParametersClass,
+ memoryAreaClass,
+ processingGroupParametersClass,
+ java.lang.Runnable.class};
+
+ threadConstructor = threadClass.getConstructor(paramTypes);
+
+ Class priorityParameterClass =
Class.forName("javax.realtime.PriorityParameters");
+ priorityParameterConstructor =
priorityParameterClass.getConstructor(new Class[]{int.class});
+ }
+
+ public Thread createThread(Runnable r) throws Exception
+ {
+ return createThread(r,defaultRTThreadPriority);
+ }
+
+ public Thread createThread(Runnable r, int priority) throws Exception
+ {
+ Object priorityParams =
priorityParameterConstructor.newInstance(priority);
+ return
(Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java?rev=719628&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/ThreadFactory.java
Fri Nov 21 08:32:53 2008
@@ -0,0 +1,7 @@
+package org.apache.qpid.thread;
+
+public interface ThreadFactory
+{
+ public Thread createThread(Runnable r) throws Exception;
+ public Thread createThread(Runnable r, int priority) throws Exception;
+}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java?rev=719628&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
Fri Nov 21 08:32:53 2008
@@ -0,0 +1,26 @@
+package org.apache.qpid.thread;
+
+public final class Threading
+{
+ private static ThreadFactory threadFactory;
+
+ static {
+ try
+ {
+ Class threadFactoryClass =
+ Class.forName(System.getProperty("qpid.thread_factory",
+
"org.apache.qpid.thread.DefaultThreadFactory"));
+
+ threadFactory = (ThreadFactory)threadFactoryClass.newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error occured while loading thread factory",e);
+ }
+ }
+
+ public static ThreadFactory getThreadFactory()
+ {
+ return threadFactory;
+ }
+}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
Fri Nov 21 08:32:53 2008
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
@@ -35,7 +36,7 @@
*
*/
-final class IoReceiver extends Thread
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
@@ -46,6 +47,7 @@
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver,
int bufferSize, long timeout)
@@ -55,10 +57,18 @@
this.bufferSize = bufferSize;
this.socket = transport.getSocket();
this.timeout = timeout;
-
- setDaemon(true);
- setName(String.format("IoReceiver - %s",
socket.getRemoteSocketAddress()));
- start();
+
+ try
+ {
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s",
socket.getRemoteSocketAddress()));
+ receiverThread.start();
}
void close(boolean block)
@@ -75,10 +85,10 @@
{
socket.shutdownInput();
}
- if (block && Thread.currentThread() != this)
+ if (block && Thread.currentThread() != receiverThread)
{
- join(timeout);
- if (isAlive())
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
{
throw new TransportException("join timed out");
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
Fri Nov 21 08:32:53 2008
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
@@ -32,7 +33,7 @@
import static org.apache.qpid.transport.util.Functions.*;
-public final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender implements Runnable, Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
@@ -54,7 +55,8 @@
private final Object notFull = new Object();
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
-
+ private final Thread senderThread;
+
private volatile Throwable exception = null;
@@ -74,9 +76,18 @@
throw new TransportException("Error getting output stream for
socket", e);
}
- setDaemon(true);
- setName(String.format("IoSender - %s",
socket.getRemoteSocketAddress()));
- start();
+ try
+ {
+ senderThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOSender thread",e);
+ }
+
+ senderThread.setDaemon(true);
+ senderThread.setName(String.format("IoSender - %s",
socket.getRemoteSocketAddress()));
+ senderThread.start();
}
private static final int pof2(int n)
@@ -188,10 +199,10 @@
try
{
- if (Thread.currentThread() != this)
+ if (Thread.currentThread() != senderThread)
{
- join(timeout);
- if (isAlive())
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
{
throw new SenderException("join timed out");
}
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
Fri Nov 21 08:32:53 2008
@@ -37,6 +37,7 @@
import javax.jms.TextMessage;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* Latency test sends an x number of messages in warmup mode and wait for a
confirmation
@@ -314,19 +315,36 @@
public static void main(String[] args)
{
- LatencyTest latencyTest = new LatencyTest();
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
+ final LatencyTest latencyTest = new LatencyTest();
+ Runnable r = new Runnable()
{
- try
+ public void run()
{
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating latency test thread",e);
}
+ t.start();
}
}
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
Fri Nov 21 08:32:53 2008
@@ -27,6 +27,8 @@
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* PerfConsumer will receive x no of messages in warmup mode.
* Once it receives the Start message it will then signal the PerfProducer.
@@ -242,7 +244,24 @@
public static void main(String[] args)
{
- PerfConsumer cons = new PerfConsumer();
- cons.test();
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
}
}
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
Fri Nov 21 08:32:53 2008
@@ -27,6 +27,7 @@
import javax.jms.MessageProducer;
import org.apache.qpid.testkit.MessageFactory;
+import org.apache.qpid.thread.Threading;
/**
* PerfProducer sends an x no of messages in warmup mode and wait for a
confirmation
@@ -201,7 +202,24 @@
public static void main(String[] args)
{
- PerfProducer prod = new PerfProducer();
- prod.test();
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
}
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
Fri Nov 21 08:32:53 2008
@@ -29,6 +29,8 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -67,7 +69,7 @@
{
final Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -131,7 +133,18 @@
}
- });
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+
t.setName("session-" + i);
t.start();
} // for loop
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
Fri Nov 21 08:32:53 2008
@@ -32,6 +32,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -79,7 +80,7 @@
for (int i = 0; i < session_count; i++)
{
final Session session = con.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
+ Runnable r = new Runnable()
{
private Random gen = new Random();
@@ -142,7 +143,16 @@
}
- });
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
t.setName("session-" + i);
t.start();
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
Fri Nov 21 08:32:53 2008
@@ -30,6 +30,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -131,8 +132,23 @@
public static void main(String[] args)
{
- ResourceLeakTest test = new ResourceLeakTest();
- test.test();
+ final ResourceLeakTest test = new ResourceLeakTest();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating test thread",e);
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
Fri Nov 21 08:32:53 2008
@@ -29,6 +29,8 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.qpid.thread.Threading;
+
/**
* Test Description
* ================
@@ -126,9 +128,24 @@
public static void main(String[] args)
{
- SimpleConsumer test = new SimpleConsumer();
- test.setUp();
- test.test();
+ final SimpleConsumer test = new SimpleConsumer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
Fri Nov 21 08:32:53 2008
@@ -33,6 +33,7 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
@@ -138,9 +139,24 @@
public static void main(String[] args)
{
- SimpleProducer test = new SimpleProducer();
- test.setUp();
- test.test();
+ final SimpleProducer test = new SimpleProducer();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.setUp();
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java?rev=719628&r1=719627&r2=719628&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
(original)
+++
incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
Fri Nov 21 08:32:53 2008
@@ -20,23 +20,45 @@
*/
package org.apache.qpid.tools;
-import java.lang.reflect.InvocationTargetException;
+import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
+import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
+import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
+
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
-import javax.jms.*;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeBind;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageSubscribe;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.QueueDeclare;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import static org.apache.qpid.tools.QpidBench.Mode.*;
-
/**
* QpidBench
*
@@ -412,7 +434,7 @@
{
case CONSUME:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -432,7 +454,18 @@
throw new RuntimeException(e);
}
}
- }.start();
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
break;
}
@@ -440,7 +473,7 @@
{
case PUBLISH:
case BOTH:
- new Thread()
+ Runnable r = new Runnable()
{
public void run()
{
@@ -460,7 +493,17 @@
throw new RuntimeException(e);
}
}
- }.start();
+ };
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating publisher thread",e);
+ }
+ t.start();
break;
}
}