I've made some modifications to the org.apache.jmeter.threads.ListenerNotifier which make the code easier to understand and might have a bit less overhead.
One important note: this patch uses the "Buffer" from Jakarta Commons Collections. I think that in this case there is a slight performance benefit (especially reduced memory usage) of using Buffer instead of LinkedList. I don't think this single use of Commons Collections is enough to justify adding Collections to the distribution, but I think we could probably make use of Collections classes elsewhere, so it might be a reasonable addition to the distribution. In any case, if we don't want to add this dependency on Collections right now then it would be reasonably easy for me to switch to LinkedList. Just let me know. If we do add Collections to the distribution, I tested with Collections 2.1. It should just be a matter of adding the jar to the lib folder and making an update to build.xml.
Another note: the current ListenerNotifier doesn't have a license in the file. I wasn't sure if I have the right to add it, even though nearly every line in the file ended up changing. So assuming that the file should have the Apache license, the commiter should probably add it to the file.
As always, let me know if there are any questions about this change.
Jeremy
Index: jakarta-jmeter/src/core/org/apache/jmeter/threads/ListenerNotifier.java
===================================================================
RCS file:
/home/cvspublic/jakarta-jmeter/src/core/org/apache/jmeter/threads/ListenerNotifier.java,v
retrieving revision 1.10
diff -u -r1.10 ListenerNotifier.java
--- jakarta-jmeter/src/core/org/apache/jmeter/threads/ListenerNotifier.java 5 Feb
2003 05:12:09 -0000 1.10
+++ jakarta-jmeter/src/core/org/apache/jmeter/threads/ListenerNotifier.java 9 Mar
2003 14:34:35 -0000
@@ -8,105 +8,234 @@
import org.apache.jmeter.util.JMeterUtils;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.UnboundedFifoBuffer;
+
/**
- * @author Administrator
+ * The <code>ListenerNotifier</code> thread is responsible for performing
+ * asynchronous notifications that a sample has occurred. Each time a
+ * sample occurs, the <code>addLast</code> method should be called to add
+ * the sample and its list of listeners to the notification queue. This
+ * thread will then notify those listeners asynchronously at some future
+ * time.
+ * <p>
+ * In the current implementation, the notifications will be made in batches,
+ * with 2 seconds between the beginning of successive batches. If the
+ * notifier thread starts to get behind, the priority of the thread will be
+ * increased in an attempt to help it to keep up.
*
- * To change this generated comment edit the template variable "typecomment":
- * Window>Preferences>Java>Templates.
+ * @see org.apache.jmeter.samplers.SampleListener
*/
-public class ListenerNotifier extends LinkedList implements Runnable
-{
- private static Logger log =
- LoggingManager.getLoggerFor(JMeterUtils.ENGINE);
- /**
- * @see java.lang.Runnable#run()
- */
- boolean running;
- boolean isStopped;
- static private int ABS_MAX = 500;
- static private int MAX = 200;
- int sleepTime = 2000;
- public ListenerNotifier()
- {
- super();
- running = true;
- isStopped = true;
- }
- public void run()
- {
- Iterator iter;
- while (running || this.size() > 1)
- {
- SampleEvent res = (SampleEvent) this.removeFirst();
- if (res != null)
- {
- List listeners = (List) this.removeFirst();
- notifyListeners(res, listeners);
- }
- try
- {
- Thread.sleep(sleepTime);
- }
- catch (InterruptedException e)
- {
- }
- if (size() > MAX && Thread.currentThread().getPriority() ==
Thread.NORM_PRIORITY)
- {
- log.debug("Notifier thread priority going from normal
to max, size = "+size());
-
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
- }
- }
- log.debug("Listener Notifier stopped");
- isStopped = true;
- }
- private void notifyListeners(SampleEvent res, List listeners)
- {
- Iterator iter;
- iter = listeners.iterator();
- while (iter.hasNext())
- {
- ((SampleListener) iter.next()).sampleOccurred(res);
- }
- }
- public boolean isStopped()
- {
- return isStopped;
- }
- public synchronized void addLast(SampleEvent item, List listeners)
- {
- if(size() > ABS_MAX)
- {
- notifyListeners(item,listeners);
- }
- else
- {
- super.addLast(item);
- super.addLast(listeners);
- sleepTime = 0;
- }
- }
- public synchronized Object removeFirst()
- {
- try
- {
- return super.removeFirst();
- }
- catch (RuntimeException e)
- {
- sleepTime = 2000;
- log.debug("Setting notifier thread priority to normal");
- Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
- return null;
- }
- }
- public void stop()
- {
- running = false;
- }
- public void start()
- {
- Thread noteThread = new Thread(this);
- noteThread.start();
- isStopped = false;
- }
+public class ListenerNotifier implements Runnable {
+ private static Logger log =
+ LoggingManager.getLoggerFor(JMeterUtils.ENGINE);
+
+ /**
+ * The number of milliseconds between batches of notifications.
+ */
+ private static final int SLEEP_TIME = 2000;
+
+ /**
+ * Indicates whether or not this thread should remain running. The
+ * thread will continue running after this field is set to false until
+ * the next batch of notifications has been completed and the
+ * notification queue is empty.
+ */
+ private boolean running = true;
+
+ /**
+ * Indicates whether or not this thread has stopped. No further
+ * notifications will be performed.
+ */
+ private boolean isStopped = true;
+
+ /**
+ * The queue containing the notifications to be performed. Each
+ * notification consists of a pair of entries in this queue. The
+ * first is the [EMAIL PROTECTED] org.apache.jmeter.samplers.SampleEvent
+ * SampleEvent} representing the sample. The second is a List of
+ * [EMAIL PROTECTED] org.apache.jmeter.samplers.SampleListener SampleListener}s
+ * which should be notified.
+ */
+ private Buffer listenerEvents =
+ BufferUtils.synchronizedBuffer(new UnboundedFifoBuffer());
+
+
+ /**
+ * Starts running the ListenerNotifier thread.
+ */
+ public void start() {
+ isStopped = false;
+ new Thread(this).start();
+ }
+
+ /**
+ * Stops the ListenerNotifier thread. The thread will continue processing
+ * any events remaining in the notification queue before it actually
+ * stops, but this method will return immediately.
+ */
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * Indicates whether or not the thread has stopped. This will not
+ * return true until the <code>stop</code> method has been called and
+ * any remaining notifications in the queue have been completed.
+ *
+ * @return true if the ListenerNotifier has completely stopped, false
+ * otherwise
+ */
+ public boolean isStopped() {
+ return isStopped;
+ }
+
+ /**
+ * Process the events in the notification queue until the thread has
+ * been told to stop and the notification queue is empty.
+ * <p>
+ * In the current implementation, this method will iterate continually
+ * until the thread is told to stop. In each iteration it will process
+ * any notifications that are in the queue at the beginning of the
+ * iteration, and then will sleep until it is time to start the next
+ * batch. As long as the thread is keeping up, each batch should start
+ * 2 seconds after the beginning of the last batch. This exact
+ * behavior is subject to change.
+ */
+ public void run() {
+ boolean isMaximumPriority = false;
+ int normalCount = 0;
+
+ while (running) {
+ long startTime = System.currentTimeMillis();
+ processNotifications();
+ long sleep = SLEEP_TIME - (System.currentTimeMillis() - startTime);
+
+ // If the thread has been told to stop then we shouldn't sleep
+ if (!running) {
+ break;
+ }
+
+ if (sleep < 0) {
+ isMaximumPriority = true;
+ normalCount = 0;
+ if (log.isInfoEnabled()) {
+ log.info("ListenerNotifier exceeded maximum " +
+ "notification time by " + (-sleep) + "ms");
+ }
+ boostPriority();
+ } else {
+ normalCount++;
+
+ // If there have been three consecutive iterations since the
+ // last iteration which took too long to execute, return the
+ // thread to normal priority.
+ if (isMaximumPriority && normalCount >= 3) {
+ isMaximumPriority = false;
+ unboostPriority();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("ListenerNotifier sleeping for " + sleep + "ms");
+ }
+
+ try {
+ Thread.sleep (sleep);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // Make sure that all pending notifications are processed before
+ // actually ending the thread.
+ processNotifications();
+ isStopped = true;
+ }
+
+ /**
+ * Process all of the pending notifications. Only the samples which are
+ * in the queue when this method is called will be processed. Any samples
+ * added between the time when this method is called and when it exits are
+ * saved for the next batch.
+ */
+ private void processNotifications() {
+ int listenerEventsSize = listenerEvents.size();
+ if (log.isDebugEnabled()) {
+ log.debug ("ListenerNotifier: processing " + listenerEventsSize +
+ " events");
+ }
+
+ while (listenerEventsSize > 0) {
+ // Since this is a FIFO and this is the only place we remove
+ // from it (only from a single thread) we don't have to remove
+ // these two items in one atomic operation. Each individual
+ // remove is atomic (because we use a synchronized buffer),
+ // which is necessary since the buffer can be accessed from
+ // other threads (to add things to the buffer).
+ SampleEvent res = (SampleEvent)listenerEvents.remove();
+ List listeners = (List)listenerEvents.remove();
+
+ notifyListeners (res, listeners);
+
+ listenerEventsSize -= 2;
+ }
+ }
+
+ /**
+ * Boost the priority of the current thread to maximum priority. If
+ * the thread is already at maximum priority then this will have no
+ * effect.
+ */
+ private void boostPriority() {
+ if (Thread.currentThread().getPriority() != Thread.MAX_PRIORITY) {
+ log.info("ListenerNotifier: Boosting thread priority to maximum.");
+ Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
+ }
+ }
+
+ /**
+ * Return the priority of the current thread to normal. If the thread
+ * is already at normal priority then this will have no effect.
+ */
+ private void unboostPriority() {
+ if (Thread.currentThread().getPriority() != Thread.NORM_PRIORITY) {
+ log.info("ListenerNotifier: Returning thread priority to normal.");
+ Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
+ }
+ }
+
+ /**
+ * Notify a list of listeners that a sample has occurred.
+ *
+ * @param res the sample event that has occurred. Must be non-null.
+ * @param listeners a list of the listeners which should be notified.
+ * This list must not be null and must contain only
+ * SampleListener elements.
+ */
+ private void notifyListeners(SampleEvent res, List listeners) {
+ Iterator iter = listeners.iterator();
+ while (iter.hasNext()) {
+ ((SampleListener) iter.next()).sampleOccurred(res);
+ }
+ }
+
+ /**
+ * Add a new sample event to the notification queue. The notification
+ * will be performed asynchronously and this method will return
+ * immediately.
+ *
+ * @param item the sample event that has occurred. Must be non-null.
+ * @param listeners a list of the listeners which should be notified.
+ * This list must not be null and must contain only
+ * SampleListener elements.
+ */
+ public void addLast(SampleEvent item, List listeners) {
+ // Must use explicit synchronization here so that the item and
+ // listeners are added together atomically
+ synchronized (listenerEvents) {
+ listenerEvents.add(item);
+ listenerEvents.add(listeners);
+ }
+ }
}--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
