Author: boisvert
Date: Thu Mar 22 11:04:46 2007
New Revision: 521388

URL: http://svn.apache.org/viewvc?view=rev&rev=521388
Log:
Tweak log messages

Modified:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/Receiver.java

Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/Receiver.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/Receiver.java?view=diff&rev=521388&r1=521387&r2=521388
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/Receiver.java 
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/Receiver.java Thu 
Mar 22 11:04:46 2007
@@ -18,203 +18,188 @@
  */
 package org.apache.ode.jbi;
 
-
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Receiver pulls messages from the NMS and submits them to ODE for further
- * processing.
+ * Receiver pulls messages from the NMS and submits them to ODE for further 
processing.
  */
 public class Receiver implements Runnable {
-  private static final Log __log = LogFactory.getLog(Receiver.class);
+    private static final Log __log = LogFactory.getLog(Receiver.class);
 
-  // default time to wait for MessageExchanges, in seconds
-  private static final long ACCEPT_TIMEOUT = 1L;
+    // default time to wait for MessageExchanges, in seconds
+    private static final long ACCEPT_TIMEOUT = 1L;
 
-  // default time to wait for the ExecutorService to shut down, in seconds
-  private static final long THREADPOOL_SHUTDOWN_TIMEOUT = 10L;
+    // default time to wait for the ExecutorService to shut down, in seconds
+    private static final long THREADPOOL_SHUTDOWN_TIMEOUT = 10L;
 
-  // default number of threads in the thread pool
-  private static final int THREADPOOL_SIZE = 8;
-
-  private OdeContext _odeContext = null;
-
-  private DeliveryChannel _channel = null;
-
-  /** Receiver-Running Flag. */
-  private AtomicBoolean _isRunning = new AtomicBoolean(false);
-
-  /** Receiver-Started Flag. */
-  private AtomicBoolean _isStarted = new AtomicBoolean(false);
-
-  private Thread _thread;
-
-  // thread pool for dispatching received messages
-  private ExecutorService _executorService;
-
-  /**
-   * Constructor for creating instance of this class.
-   * 
-   * @param context
-   *          for receiving environment parameters.
-   */
-  public Receiver(OdeContext context) {
-    _odeContext = context;
-    _thread = new Thread(this);
-    _executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
-  }
-
-  /**
-   * Start the receiver thread.
-   */
-  public void start() {
-    if (_isStarted.compareAndSet(false, true)) {
-      _isRunning.set(true);
-      _thread.start();
-    } else
-      throw new IllegalStateException("Receiver cannot be restarted.");
-  }
-
-  /**
-   * This is called to gracefully stop the Receiver thread. After shutting down
-   * the thread pool we wait for a maximum of 10 seconds before forcefully
-   * cancelling in-flight threads.
-   */
-  public void cease() {
-
-    if (!_isStarted.get())
-      return;
-
-    __log.info("Receiver is ceasing.");
-
-    if (_isRunning.compareAndSet(true, false)) {
-      try {
-        // This should not take more ACCEPT_TIMEOUT seconds, we
-        // give it three times as much time.
-        _thread.join(3 * TimeUnit.SECONDS.toMillis(ACCEPT_TIMEOUT));
-
-        // Odd, we should not be alive at this point.
-        if (_thread.isAlive()) {
-          __log
-              .warn("Receiver thread is not dying gracefully; interrupting.");
-          _thread.interrupt();
-        }
+    // default number of threads in the thread pool
+    private static final int THREADPOOL_SIZE = 8;
 
-        // Try joining again.
-        _thread.join(3 * TimeUnit.SECONDS.toMillis(ACCEPT_TIMEOUT));
+    private OdeContext _odeContext = null;
 
-        // If it's not dead yet, we got a problem we can't deal with.
-        if (_thread.isAlive()) {
-          __log
-              .fatal("Receiver thread is not dying gracefully despite our 
insistence!.");
-        }
+    private DeliveryChannel _channel = null;
 
-        // In any case, next step is to shutdown the thread pool
-        _executorService.shutdown();
+    /** Receiver-Running Flag. */
+    private AtomicBoolean _isRunning = new AtomicBoolean(false);
 
-        // make sure no outstanding threads are hanging around
-        if (!_executorService.awaitTermination(THREADPOOL_SHUTDOWN_TIMEOUT,
-            TimeUnit.SECONDS)) {
-          __log
-              .warn("Problem shutting down ExecutorService - trying harder.");
-          List outstanding = _executorService.shutdownNow();
-          if (outstanding != null && !outstanding.isEmpty()) {
-            __log.warn("Cancelled " + outstanding.size()
-                + " in-flight threads.");
-          }
-        }
-      } catch (InterruptedException ie) {
-        __log.warn("Interrupted during cease(): ", ie);
-      }
-
-      // just to be sure..
-      _executorService.shutdown();
-      __log.info("Receiver ceased.");
-
-      _executorService = null;
-      _thread = null;
-      _odeContext = null;
-      _channel = null;
+    /** Receiver-Started Flag. */
+    private AtomicBoolean _isStarted = new AtomicBoolean(false);
+
+    private Thread _thread;
+
+    // thread pool for dispatching received messages
+    private ExecutorService _executorService;
+
+    /**
+     * Constructor for creating instance of this class.
+     * 
+     * @param context
+     *            for receiving environment parameters.
+     */
+    public Receiver(OdeContext context) {
+        _odeContext = context;
+        _thread = new Thread(this);
+        _executorService = Executors.newFixedThreadPool(THREADPOOL_SIZE);
     }
-  }
 
-  /**
-   * We periodically poll for input messages, blocking for 1 sec on the 
accept()
-   * call to receive messages. Depending on runFlag status we either try to
-   * again poll again or exit.
-   */
-  public void run() {
-    __log.info("Receiver is executing.");
-
-    try {
-      _channel = _odeContext.getContext().getDeliveryChannel();
-      if (_channel == null) {
-        __log.fatal("No Channel!");
-        return;
-      }
-    } catch (MessagingException ex) {
-      __log.fatal("Error getting channel! ", ex);
-      return;
+    /**
+     * Start the receiver thread.
+     */
+    public void start() {
+        if (_isStarted.compareAndSet(false, true)) {
+            _isRunning.set(true);
+            _thread.start();
+        } else
+            throw new IllegalStateException("Receiver cannot be restarted.");
     }
 
-    while (_isRunning.get()) {
-      final MessageExchange messageExchange;
-      try {
-        messageExchange = _channel.accept(TimeUnit.SECONDS
-            .toMillis(ACCEPT_TIMEOUT));
-        if (messageExchange != null) {
-          if (__log.isTraceEnabled()) {
-            __log.trace("Got JBI message for endpoint: "
-                + messageExchange.getEndpoint().getEndpointName());
-          }
-
-          // Even if we got a message exchange, we only run it
-          // if we have not been told to cease.
-          if (_isRunning.get()) {
-            __log.trace("Scheduling execution of "
-                + messageExchange.getExchangeId());
-            _executorService.submit(new Runnable() {
-              public void run() {
-                try {
-                  
_odeContext._jbiMessageExchangeProcessor.onJbiMessageExchange(messageExchange);
-                } catch (Throwable t) {
-                  __log.error("Error processing JBI message.", t);
+    /**
+     * This is called to gracefully stop the Receiver thread. After shutting 
down the thread pool we wait for a maximum
+     * of 10 seconds before forcefully cancelling in-flight threads.
+     */
+    public void cease() {
+
+        if (!_isStarted.get())
+            return;
+
+        __log.info("Receiver is ceasing.");
+
+        if (_isRunning.compareAndSet(true, false)) {
+            try {
+                // This should not take more ACCEPT_TIMEOUT seconds, we
+                // give it three times as much time.
+                _thread.join(3 * TimeUnit.SECONDS.toMillis(ACCEPT_TIMEOUT));
+
+                // Odd, we should not be alive at this point.
+                if (_thread.isAlive()) {
+                    __log.warn("Receiver thread is not dying gracefully; 
interrupting.");
+                    _thread.interrupt();
                 }
-              }
-            });
-          } else {
-            __log.warn("Skipping processing of message exchange "
-                + messageExchange.getExchangeId()
-                + "; component no longer active.");
-          }
-        }
-      } catch (MessagingException mex) {
-        if (_isRunning.get())
-          __log.warn("Receiver exiting due to MessagingException:", mex);
-        else
-          __log.info("Receiver finished.");
-        break;
-      } catch (Exception ex) {
-        if (!_isRunning.get()) {
-          __log.info("Receiver finished.");
-          break;
+
+                // Try joining again.
+                _thread.join(3 * TimeUnit.SECONDS.toMillis(ACCEPT_TIMEOUT));
+
+                // If it's not dead yet, we got a problem we can't deal with.
+                if (_thread.isAlive()) {
+                    __log.fatal("Receiver thread is not dying gracefully 
despite our insistence!.");
+                }
+
+                // In any case, next step is to shutdown the thread pool
+                _executorService.shutdown();
+
+                // make sure no outstanding threads are hanging around
+                if 
(!_executorService.awaitTermination(THREADPOOL_SHUTDOWN_TIMEOUT, 
TimeUnit.SECONDS)) {
+                    __log.warn("Problem shutting down ExecutorService - trying 
harder.");
+                    List outstanding = _executorService.shutdownNow();
+                    if (outstanding != null && !outstanding.isEmpty()) {
+                        __log.warn("Cancelled " + outstanding.size() + " 
in-flight threads.");
+                    }
+                }
+            } catch (InterruptedException ie) {
+                __log.warn("Interrupted during cease(): ", ie);
+            }
+
+            // just to be sure..
+            _executorService.shutdown();
+            __log.info("Receiver ceased.");
+
+            _executorService = null;
+            _thread = null;
+            _odeContext = null;
+            _channel = null;
         }
-        __log.warn("Caught unexpected Exception: ", ex);
-        return;
-      }
     }
 
-    __log.info("Receiver finished.");
-  }
+    /**
+     * We periodically poll for input messages, blocking for 1 sec on the 
accept() call to receive messages. Depending
+     * on runFlag status we either try to again poll again or exit.
+     */
+    public void run() {
+        __log.info("Receiver is executing.");
+
+        try {
+            _channel = _odeContext.getContext().getDeliveryChannel();
+            if (_channel == null) {
+                __log.fatal("No Channel!");
+                return;
+            }
+        } catch (MessagingException ex) {
+            __log.fatal("Error getting channel! ", ex);
+            return;
+        }
+
+        while (_isRunning.get()) {
+            final MessageExchange messageExchange;
+            try {
+                messageExchange = 
_channel.accept(TimeUnit.SECONDS.toMillis(ACCEPT_TIMEOUT));
+                if (messageExchange != null) {
+                    if (__log.isTraceEnabled()) {
+                        __log.trace("Got JBI message for endpoint: " + 
messageExchange.getEndpoint().getEndpointName());
+                    }
+
+                    // Even if we got a message exchange, we only run it
+                    // if we have not been told to cease.
+                    if (_isRunning.get()) {
+                        if (__log.isTraceEnabled()) {
+                            __log.trace("Scheduling execution of " + 
messageExchange.getExchangeId());
+                        }
+                        _executorService.submit(new Runnable() {
+                            public void run() {
+                                try {
+                                    
_odeContext._jbiMessageExchangeProcessor.onJbiMessageExchange(messageExchange);
+                                } catch (Throwable t) {
+                                    __log.error("Error processing JBI 
message.", t);
+                                }
+                            }
+                        });
+                    } else {
+                        __log.warn("Skipping processing of message exchange " 
+ messageExchange.getExchangeId()
+                                + "; component no longer active.");
+                    }
+                }
+            } catch (MessagingException mex) {
+                if (_isRunning.get())
+                    __log.warn("Receiver exiting due to MessagingException:", 
mex);
+                else
+                    __log.info("Receiver finished.");
+                break;
+            } catch (Exception ex) {
+                if (!_isRunning.get()) {
+                    __log.info("Receiver finished.");
+                    break;
+                }
+                __log.warn("Caught unexpected Exception: ", ex);
+                return;
+            }
+        }
+
+        __log.info("Receiver finished.");
+    }
 }


Reply via email to