Author: carnold
Date: Fri May 19 13:00:36 2006
New Revision: 407896

URL: http://svn.apache.org/viewvc?rev=407896&view=rev
Log:
Bug 38982: Add non-blocking option for AsyncAppender

Modified:
    
logging/log4j/branches/v1_2-branch/src/java/org/apache/log4j/AsyncAppender.java

Modified: 
logging/log4j/branches/v1_2-branch/src/java/org/apache/log4j/AsyncAppender.java
URL: 
http://svn.apache.org/viewvc/logging/log4j/branches/v1_2-branch/src/java/org/apache/log4j/AsyncAppender.java?rev=407896&r1=407895&r2=407896&view=diff
==============================================================================
--- 
logging/log4j/branches/v1_2-branch/src/java/org/apache/log4j/AsyncAppender.java 
(original)
+++ 
logging/log4j/branches/v1_2-branch/src/java/org/apache/log4j/AsyncAppender.java 
Fri May 19 13:00:36 2006
@@ -1,12 +1,12 @@
 /*
- * Copyright 1999-2005 The Apache Software Foundation.
- * 
+ * Copyright 1999,2006 The Apache Software Foundation.
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,322 +16,573 @@
 
 // Contibutors:  Aaron Greenhouse <[EMAIL PROTECTED]>
 //               Thomas Tuft Muller <[EMAIL PROTECTED]>
-
 package org.apache.log4j;
 
-import org.apache.log4j.spi.LoggingEvent;
-import org.apache.log4j.helpers.BoundedFIFO;
-import org.apache.log4j.spi.AppenderAttachable;
 import org.apache.log4j.helpers.AppenderAttachableImpl;
-import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.AppenderAttachable;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.text.MessageFormat;
+
+import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 
 /**
-   The AsyncAppender lets users log events asynchronously. It uses a
-   bounded buffer to store logging events.
+ * The AsyncAppender lets users log events asynchronously.
+ * <p/>
+ * <p/>
+ * The AsyncAppender will collect the events sent to it and then dispatch them
+ * to all the appenders that are attached to it. You can attach multiple
+ * appenders to an AsyncAppender.
+ * </p>
+ * <p/>
+ * <p/>
+ * The AsyncAppender uses a separate thread to serve the events in its buffer.
+ * </p>
+ * <p/>
+ * <b>Important note:</b> The <code>AsyncAppender</code> can only be script
+ * configured using the [EMAIL PROTECTED] 
org.apache.log4j.xml.DOMConfigurator}.
+ * </p>
+ *
+ * @author Ceki G&uuml;lc&uuml;
+ * @author Curt Arnold
+ * @since 0.9.1
+ */
+public class AsyncAppender extends AppenderSkeleton
+  implements AppenderAttachable {
+  /**
+   * The default buffer size is set to 128 events.
+   */
+  public static final int DEFAULT_BUFFER_SIZE = 128;
 
-   <p>The AsyncAppender will collect the events sent to it and then
-   dispatch them to all the appenders that are attached to it. You can
-   attach multiple appenders to an AsyncAppender.
+  /**
+   * Event buffer, also used as monitor to protect itself and
+   * discardMap from simulatenous modifications.
+   */
+  private final List buffer = new ArrayList();
 
-   <p>The AsyncAppender uses a separate thread to serve the events in
-   its bounded buffer.
+  /**
+   * Map of DiscardSummary objects keyed by logger name.
+   */
+  private final Map discardMap = new HashMap();
 
-   <p>Refer to the results in [EMAIL PROTECTED] 
org.apache.log4j.performance.Logging}
-   for the impact of using this appender.
+  /**
+   * Buffer size.
+   */
+  private int bufferSize = DEFAULT_BUFFER_SIZE;
 
-   <p><b>Important note:</b> The <code>AsyncAppender</code> can only
-   be script configured using the [EMAIL PROTECTED]
-   org.apache.log4j.xml.DOMConfigurator}.
+  /** Nested appenders. */
+  AppenderAttachableImpl aai;
 
-   @author Ceki G&uuml;lc&uuml;
-   @since 0.9.1 */
-public class AsyncAppender extends AppenderSkeleton
-                                            implements AppenderAttachable {
+  /**
+   * Nested appenders.
+   */
+  private final AppenderAttachableImpl appenders;
 
-  /** The default buffer size is set to 128 events. */
-  public static final int DEFAULT_BUFFER_SIZE = 128;
+  /**
+   * Dispatcher.
+   */
+  private final Thread dispatcher;
 
-  //static Category cat = Category.getInstance(AsyncAppender.class.getName());
+  /**
+   * Should location info be included in dispatched messages.
+   */
+  private boolean locationInfo = false;
 
-  BoundedFIFO bf = new BoundedFIFO(DEFAULT_BUFFER_SIZE);
+  /**
+   * Does appender block when buffer is full.
+   */
+  private boolean blocking = true;
 
-  AppenderAttachableImpl aai;
-  Dispatcher dispatcher;
-  boolean locationInfo = false;
+  /**
+   * Create new instance.
+   */
+  public AsyncAppender() {
+    appenders = new AppenderAttachableImpl();
 
-  boolean interruptedWarningMessage = false;
+    //
+    //   only set for compatibility
+    aai = appenders;
 
-  public AsyncAppender() {
-    // Note: The dispatcher code assumes that the aai is set once and
-    // for all.
-    aai = new AppenderAttachableImpl();
-    dispatcher = new Dispatcher(bf, this);
+    dispatcher =
+      new Thread(new Dispatcher(this, buffer, discardMap, appenders));
+
+    // It is the user's responsibility to close appenders before
+    // exiting.
+    dispatcher.setDaemon(true);
+
+    // set the dispatcher priority to lowest possible value
+    //        dispatcher.setPriority(Thread.MIN_PRIORITY);
+    dispatcher.setName("Dispatcher-" + dispatcher.getName());
     dispatcher.start();
   }
 
-
-  public void addAppender(Appender newAppender) {
-    synchronized(aai) {
-      aai.addAppender(newAppender);
+  /**
+   * Add appender.
+   *
+   * @param newAppender appender to add, may not be null.
+   */
+  public void addAppender(final Appender newAppender) {
+    synchronized (appenders) {
+      appenders.addAppender(newAppender);
     }
   }
 
-  public void append(LoggingEvent event) {
+  /**
+   * [EMAIL PROTECTED]
+   */
+  public void append(final LoggingEvent event) {
     //
     //   if dispatcher thread has died then
     //      append subsequent events synchronously
     //   See bug 23021
-    if (!dispatcher.isAlive()) {
-        synchronized(aai) {
-          aai.appendLoopOnAppenders(event);
-        }
-        return;
+    if ((dispatcher == null) || !dispatcher.isAlive() || (bufferSize <= 0)) {
+      synchronized (appenders) {
+        appenders.appendLoopOnAppenders(event);
+      }
+
+      return;
     }
+
     // Set the NDC and thread name for the calling thread as these
     // LoggingEvent fields were not set at event creation time.
     event.getNDC();
     event.getThreadName();
     // Get a copy of this thread's MDC.
     event.getMDCCopy();
-    if(locationInfo) {
+    if (locationInfo) {
       event.getLocationInformation();
     }
-    synchronized(bf) {
-      while(bf.isFull()) {
-       try {
-         //LogLog.debug("Waiting for free space in buffer, "+bf.length());
-         bf.wait();
-       } catch(InterruptedException e) {
-         if(!interruptedWarningMessage) {
-           interruptedWarningMessage = true;
-           LogLog.warn("AsyncAppender interrupted.", e);
-         } else {
-           LogLog.warn("AsyncAppender interrupted again.");
-         }
-       }
-      }
 
-      //cat.debug("About to put new event in buffer.");
-      bf.put(event);
-      if(bf.wasEmpty()) {
-       //cat.debug("Notifying dispatcher to process events.");
-       bf.notify();
+    synchronized (buffer) {
+      while (true) {
+        int previousSize = buffer.size();
+
+        if (previousSize < bufferSize) {
+          buffer.add(event);
+
+          //
+          //   if buffer had been empty
+          //       signal all threads waiting on buffer
+          //       to check their conditions.
+          //
+          if (previousSize == 0) {
+            buffer.notifyAll();
+          }
+
+          break;
+        }
+
+        //
+        //   Following code is only reachable if buffer is full
+        //
+        //
+        //   if blocking and thread is not already interrupted
+        //      and not the dispatcher then
+        //      wait for a buffer notification
+        boolean discard = true;
+        if (blocking
+                && !Thread.interrupted()
+                && Thread.currentThread() != dispatcher) {
+          try {
+            buffer.wait();
+            discard = false;
+          } catch (InterruptedException e) {
+            //
+            //  reset interrupt status so
+            //    calling code can see interrupt on
+            //    their next wait or sleep.
+            Thread.currentThread().interrupt();
+          }
+        }
+
+        //
+        //   if blocking is false or thread has been interrupted
+        //   add event to discard map.
+        //
+        if (discard) {
+          String loggerName = event.getLoggerName();
+          DiscardSummary summary = (DiscardSummary) discardMap.get(loggerName);
+
+          if (summary == null) {
+            summary = new DiscardSummary(event);
+            discardMap.put(loggerName, summary);
+          } else {
+            summary.add(event);
+          }
+
+          break;
+        }
       }
     }
   }
 
   /**
-     Close this <code>AsyncAppender</code> by interrupting the
-     dispatcher thread which will process all pending events before
-     exiting.
-  */
+   * Close this <code>AsyncAppender</code> by interrupting the dispatcher
+   * thread which will process all pending events before exiting.
+   */
   public void close() {
-    synchronized(this) {
-      // avoid multiple close, otherwise one gets NullPointerException
-      if(closed) { 
-       return;
-      }
+    /**
+     * Set closed flag and notify all threads to check their conditions.
+     * Should result in dispatcher terminating.
+     */
+    synchronized (buffer) {
       closed = true;
+      buffer.notifyAll();
     }
 
-    // The following cannot be synchronized on "this" because the
-    // dispatcher synchronizes with "this" in its while loop. If we
-    // did synchronize we would systematically get deadlocks when
-    // close was called.
-    dispatcher.close();
     try {
       dispatcher.join();
-    } catch(InterruptedException e) {
-      LogLog.error("Got an InterruptedException while waiting for the "+
-                  "dispatcher to finish.", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      org.apache.log4j.helpers.LogLog.error(
+        "Got an InterruptedException while waiting for the "
+        + "dispatcher to finish.", e);
+    }
+
+    //
+    //    close all attached appenders.
+    //
+    synchronized (appenders) {
+      Enumeration iter = appenders.getAllAppenders();
+
+      if (iter != null) {
+        while (iter.hasMoreElements()) {
+          Object next = iter.nextElement();
+
+          if (next instanceof Appender) {
+            ((Appender) next).close();
+          }
+        }
+      }
     }
-    dispatcher = null;
-    bf = null;
   }
 
+  /**
+   * Get iterator over attached appenders.
+   * @return iterator or null if no attached appenders.
+   */
   public Enumeration getAllAppenders() {
-    synchronized(aai) {
-      return aai.getAllAppenders();
+    synchronized (appenders) {
+      return appenders.getAllAppenders();
     }
   }
 
-  public Appender getAppender(String name) {
-    synchronized(aai) {
-      return aai.getAppender(name);
+  /**
+   * Get appender by name.
+   *
+   * @param name name, may not be null.
+   * @return matching appender or null.
+   */
+  public Appender getAppender(final String name) {
+    synchronized (appenders) {
+      return appenders.getAppender(name);
     }
   }
 
   /**
-     Returns the current value of the <b>LocationInfo</b> option.
-  */
+   * Gets whether the location of the logging request call
+   * should be captured.
+   *
+   * @return the current value of the <b>LocationInfo</b> option.
+   */
   public boolean getLocationInfo() {
     return locationInfo;
   }
 
   /**
-     Is the appender passed as parameter attached to this category?
+   * Determines if specified appender is attached.
+   * @param appender appender.
+   * @return true if attached.
    */
-  public boolean isAttached(Appender appender) {
-    return aai.isAttached(appender);
+  public boolean isAttached(final Appender appender) {
+    synchronized (appenders) {
+      return appenders.isAttached(appender);
+    }
   }
 
-
   /**
-     The <code>AsyncAppender</code> does not require a layout. Hence,
-     this method always returns <code>false</code>. 
-  */
+   * [EMAIL PROTECTED]
+   */
   public boolean requiresLayout() {
     return false;
   }
 
+  /**
+   * Removes and closes all attached appenders.
+   */
   public void removeAllAppenders() {
-    synchronized(aai) {
-      aai.removeAllAppenders();
+    synchronized (appenders) {
+      appenders.removeAllAppenders();
     }
   }
 
-
-  public void removeAppender(Appender appender) {
-    synchronized(aai) {
-      aai.removeAppender(appender);
+  /**
+   * Removes an appender.
+   * @param appender appender to remove.
+   */
+  public void removeAppender(final Appender appender) {
+    synchronized (appenders) {
+      appenders.removeAppender(appender);
     }
   }
 
-  public void removeAppender(String name) {
-    synchronized(aai) {
-      aai.removeAppender(name);
+  /**
+   * Remove appender by name.
+   * @param name name.
+   */
+  public void removeAppender(final String name) {
+    synchronized (appenders) {
+      appenders.removeAppender(name);
     }
   }
 
   /**
-   * The <b>LocationInfo</b> option takes a boolean value. By default,
-   * it is set to false which means there will be no effort to extract
-   * the location information related to the event. As a result, the
-   * event that will be ultimately logged will likely to contain the
-   * wrong location information (if present in the log format).
-   *
-   * <p>Location information extraction is comparatively very slow and
-   * should be avoided unless performance is not a concern.
-   * */
-  public void setLocationInfo(boolean flag) {
+   * The <b>LocationInfo</b> option takes a boolean value. By default, it is
+   * set to false which means there will be no effort to extract the location
+   * information related to the event. As a result, the event that will be
+   * ultimately logged will likely to contain the wrong location information
+   * (if present in the log format).
+   * <p/>
+   * <p/>
+   * Location information extraction is comparatively very slow and should be
+   * avoided unless performance is not a concern.
+   * </p>
+   * @param flag true if location information should be extracted.
+   */
+  public void setLocationInfo(final boolean flag) {
     locationInfo = flag;
   }
 
-
   /**
-   * The <b>BufferSize</b> option takes a non-negative integer value.
-   * This integer value determines the maximum size of the bounded
-   * buffer. Increasing the size of the buffer is always
-   * safe. However, if an existing buffer holds unwritten elements,
-   * then <em>decreasing the buffer size will result in event
-   * loss.</em> Nevertheless, while script configuring the
-   * AsyncAppender, it is safe to set a buffer size smaller than the
-   * [EMAIL PROTECTED] #DEFAULT_BUFFER_SIZE default buffer size} because
-   * configurators guarantee that an appender cannot be used before
-   * being completely configured.  
-   * */
-  public void setBufferSize(int size) {
-    bf.resize(size);
+   * Sets the number of messages allowed in the event buffer
+   * before the calling thread is blocked (if blocking is true)
+   * or until messages are summarized and discarded.  Changing
+   * the size will not affect messages already in the buffer.
+   *
+   * @param size buffer size, must be positive.
+   */
+  public void setBufferSize(final int size) {
+    //
+    //   log4j 1.2 would throw exception if size was negative
+    //      and deadlock if size was zero.
+    //
+    if (size < 0) {
+      throw new java.lang.NegativeArraySizeException("size");
+    }
+
+    synchronized (buffer) {
+      //
+      //   don't let size be zero.
+      //
+      bufferSize = (size < 1) ? 1 : size;
+      buffer.notifyAll();
+    }
   }
 
   /**
-     Returns the current value of the <b>BufferSize</b> option.
+   * Gets the current buffer size.
+   * @return the current value of the <b>BufferSize</b> option.
    */
   public int getBufferSize() {
-    return bf.getMaxSize();
+    return bufferSize;
   }
 
-}
-// 
------------------------------------------------------------------------------
-// 
------------------------------------------------------------------------------
-// ----------------------------------------------------------------------------
-class Dispatcher extends Thread {
-
-  BoundedFIFO bf;
-  AppenderAttachableImpl aai;
-  boolean interrupted = false;
-  AsyncAppender container;
-
-  Dispatcher(BoundedFIFO bf, AsyncAppender container) {
-    this.bf = bf;
-    this.container = container;
-    this.aai = container.aai;    
-    // It is the user's responsibility to close appenders before
-    // exiting. 
-    this.setDaemon(true);
-    // set the dispatcher priority to lowest possible value
-    this.setPriority(Thread.MIN_PRIORITY);
-    this.setName("Dispatcher-"+getName());
-
-    // set the dispatcher priority to MIN_PRIORITY plus or minus 2
-    // depending on the direction of MIN to MAX_PRIORITY.
-    //+ (Thread.MAX_PRIORITY > Thread.MIN_PRIORITY ? 1 : -1)*2);
+  /**
+   * Sets whether appender should wait if there is no
+   * space available in the event buffer or immediately return.
+   *
+   * @param value true if appender should wait until available space in buffer.
+   */
+  public void setBlocking(final boolean value) {
+    synchronized (buffer) {
+      blocking = value;
+      buffer.notifyAll();
+    }
+  }
 
+  /**
+   * Gets whether appender should block calling thread when buffer is full.
+   * If false, messages will be counted by logger and a summary
+   * message appended after the contents of the buffer have been appended.
+   *
+   * @return true if calling thread will be blocked when buffer is full.
+   */
+  public boolean getBlocking() {
+    return blocking;
   }
 
-  void close() {
-    synchronized(bf) {
-      interrupted = true;
-      // We have a waiting dispacther if and only if bf.length is
-      // zero.  In that case, we need to give it a death kiss.
-      if(bf.length() == 0) {
-       bf.notify();
+  /**
+   * Summary of discarded logging events for a logger.
+   */
+  private static final class DiscardSummary {
+    /**
+     * First event of the highest severity.
+     */
+    private LoggingEvent maxEvent;
+
+    /**
+     * Total count of messages discarded.
+     */
+    private int count;
+
+    /**
+     * Create new instance.
+     *
+     * @param event event, may not be null.
+     */
+    public DiscardSummary(final LoggingEvent event) {
+      maxEvent = event;
+      count = 1;
+    }
+
+    /**
+     * Add discarded event to summary.
+     *
+     * @param event event, may not be null.
+     */
+    public void add(final LoggingEvent event) {
+      if (event.getLevel().toInt() > maxEvent.getLevel().toInt()) {
+        maxEvent = event;
       }
+
+      count++;
     }
-  }
 
+    /**
+     * Create event with summary information.
+     *
+     * @return new event.
+     */
+    public LoggingEvent createEvent() {
+      String msg =
+        MessageFormat.format(
+          "Discarded {0} messages due to full event buffer including: {1}",
+          new Object[] { new Integer(count), maxEvent.getMessage() });
 
+      return new LoggingEvent(
+        null, Logger.getLogger(maxEvent.getLoggerName()), maxEvent.getLevel(), 
msg, null);
+    }
+  }
 
   /**
-     The dispatching strategy is to wait until there are events in the
-     buffer to process. After having processed an event, we release
-     the monitor (variable bf) so that new events can be placed in the
-     buffer, instead of keeping the monitor and processing the remaining
-     events in the buffer.
-
-    <p>Other approaches might yield better results.
-
-  */
-  public void run() {
-
-    //Category cat = Category.getInstance(Dispatcher.class.getName());
-
-    LoggingEvent event;
-
-    while(true) {
-      synchronized(bf) {
-       if(bf.length() == 0) {
-         // Exit loop if interrupted but only if the the buffer is empty.
-         if(interrupted) {
-           //cat.info("Exiting.");
-           break;
-         }
-         try {
-           //LogLog.debug("Waiting for new event to dispatch.");
-           bf.wait();
-         } catch(InterruptedException e) {
-           LogLog.error("The dispathcer should not be interrupted.");
-           break;
-         }
-       }
-       event = bf.get();
-       if(bf.wasFull()) {
-         //LogLog.debug("Notifying AsyncAppender about freed space.");
-         bf.notify();
-       }
-      } // synchronized
-
-      // The synchronization on parent is necessary to protect against
-      // operations on the aai object of the parent
-      synchronized(container.aai) {
-       if(aai != null && event != null) {
-         aai.appendLoopOnAppenders(event);
-       }
+   * Event dispatcher.
+   */
+  private static class Dispatcher implements Runnable {
+    /**
+     * Parent AsyncAppender.
+     */
+    private final AsyncAppender parent;
+
+    /**
+     * Event buffer.
+     */
+    private final List buffer;
+
+    /**
+     * Map of DiscardSummary keyed by logger name.
+     */
+    private final Map discardMap;
+
+    /**
+     * Wrapped appenders.
+     */
+    private final AppenderAttachableImpl appenders;
+
+    /**
+     * Create new instance of dispatcher.
+     *
+     * @param parent     parent AsyncAppender, may not be null.
+     * @param buffer     event buffer, may not be null.
+     * @param discardMap discard map, may not be null.
+     * @param appenders  appenders, may not be null.
+     */
+    public Dispatcher(
+      final AsyncAppender parent, final List buffer, final Map discardMap,
+      final AppenderAttachableImpl appenders) {
+
+      this.parent = parent;
+      this.buffer = buffer;
+      this.appenders = appenders;
+      this.discardMap = discardMap;
+    }
+
+    /**
+     * [EMAIL PROTECTED]
+     */
+    public void run() {
+      boolean isActive = true;
+
+      //
+      //   if interrupted (unlikely), end thread
+      //
+      try {
+        //
+        //   loop until the AsyncAppender is closed.
+        //
+        while (isActive) {
+          LoggingEvent[] events = null;
+
+          //
+          //   extract pending events while synchronized
+          //       on buffer
+          //
+          synchronized (buffer) {
+            int bufferSize = buffer.size();
+            isActive = !parent.closed;
+
+            while ((bufferSize == 0) && isActive) {
+              buffer.wait();
+              bufferSize = buffer.size();
+              isActive = !parent.closed;
+            }
+
+            if (bufferSize > 0) {
+              events = new LoggingEvent[bufferSize + discardMap.size()];
+              buffer.toArray(events);
+
+              //
+              //   add events due to buffer overflow
+              //
+              int index = bufferSize;
+
+              for (
+                Iterator iter = discardMap.values().iterator();
+                  iter.hasNext();) {
+                events[index++] = ((DiscardSummary) iter.next()).createEvent();
+              }
+
+              //
+              //    clear buffer and discard map
+              //
+              buffer.clear();
+              discardMap.clear();
+
+              //
+              //    allow blocked appends to continue
+              buffer.notifyAll();
+            }
+          }
+
+          //
+          //   process events after lock on buffer is released.
+          //
+          if (events != null) {
+            for (int i = 0; i < events.length; i++) {
+              synchronized (appenders) {
+                appenders.appendLoopOnAppenders(events[i]);
+              }
+            }
+          }
+        }
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
       }
-    } // while
-    
-    // close and remove all appenders
-    aai.removeAllAppenders();
+    }
   }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to