Author: pdodds
Date: Thu Sep 21 07:31:41 2006
New Revision: 448564

URL: http://svn.apache.org/viewvc?view=rev&rev=448564
Log:
Refactored to add the support for the handling of Classloaders from within the 
ServiceUnit,  if a ServiceUnit implementation is going to provide Java classes 
then it should return a classloader for that SU (see SM-591).

Modified:
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?view=diff&rev=448564&r1=448563&r2=448564
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 Thu Sep 21 07:31:41 2006
@@ -44,8 +44,8 @@
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Base class for life cycle management of components.
- * This class may be used as is.
+ * Base class for life cycle management of components. This class may be used 
as
+ * is.
  * 
  * @author Guillaume Nodet
  * @version $Revision: 399873 $
@@ -53,421 +53,495 @@
  */
 public class AsyncBaseLifeCycle implements ComponentLifeCycle {
 
-    protected final transient Log logger;
-    
-    protected BaseComponent component;
-    protected ComponentContext context;
-    protected ObjectName mbeanName;
-    protected WorkManager workManager;
-    protected AtomicBoolean running;
-    protected DeliveryChannel channel;
-    protected Thread poller;
-    protected AtomicBoolean polling;
-    protected TransactionManager transactionManager;
-    protected boolean workManagerCreated;
-    protected Map processors = new ConcurrentHashMap();
-    
-    
-    public AsyncBaseLifeCycle(BaseComponent component) {
-        this.component = component;
-        this.logger = component.logger;
-        this.running = new AtomicBoolean(false);
-        this.polling = new AtomicBoolean(false);
-        this.processors = new ConcurrentHashMap();
-    }
-    
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
-     */
-    public ObjectName getExtensionMBeanName() {
-        return mbeanName;
-    }
-
-    protected Object getExtensionMBean() throws Exception {
-        return null;
-    }
-    
-    protected ObjectName createExtensionMBeanName() throws Exception {
-        return 
this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
-    }
-    
-    protected QName getEPRServiceName() {
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see 
javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
-     */
-    public void init(ComponentContext context) throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Initializing component");
-            }
-            this.context = context;
-            this.channel = context.getDeliveryChannel();
-            try {
-                this.transactionManager = (TransactionManager) 
context.getTransactionManager();
-            } catch (Throwable e) {
-              // Ignore, this is just a safeguard against non compliant 
-              // JBI implementation which throws an exception instead of
-              // return null
-            }
-            doInit();
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component initialized");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling init", e);
-        }
-    }
-
-    protected void doInit() throws Exception {
-        // Register extension mbean
-        Object mbean = getExtensionMBean();
-        if (mbean != null) {
-            MBeanServer server = this.context.getMBeanServer();
-            if (server == null) {
-                // TODO: log a warning ?
-                //throw new JBIException("null mBeanServer");
-            } else {
-                this.mbeanName = createExtensionMBeanName();
-                if (server.isRegistered(this.mbeanName)) {
-                    server.unregisterMBean(this.mbeanName);
-                }
-                server.registerMBean(mbean, this.mbeanName);
-            }
-        }
-        // Obtain or create the work manager
-        // When using the WorkManager from ServiceMix,
-        // some class loader problems can appear when
-        // trying to uninstall the components.
-        // Some threads owned by the work manager have a 
-        // security context referencing the component class loader
-        // so that every loaded classes are locked
-        //this.workManager = findWorkManager();
-        if (this.workManager == null) {
-            this.workManagerCreated = true;
-            this.workManager = createWorkManager();
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#shutDown()
-     */
-    public void shutDown() throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Shutting down component");
-            }
-            doShutDown();
-            this.context = null;
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component shut down");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling shutdown", e);
-        }
-    }
-
-    protected void doShutDown() throws Exception {
-        // Unregister mbean
-        if (this.mbeanName != null) {
-            MBeanServer server = this.context.getMBeanServer();
-            if (server == null) {
-                throw new JBIException("null mBeanServer");
-            }
-            if (server.isRegistered(this.mbeanName)) {
-                server.unregisterMBean(this.mbeanName);
-            }
-        }
-        // Destroy work manager, if created
-        if (this.workManagerCreated) {
-            if (this.workManager instanceof BasicWorkManager) {
-                ((BasicWorkManager) this.workManager).shutDown();
-            }
-            this.workManager = null;
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#start()
-     */
-    public void start() throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Starting component");
-            }
-            if (this.running.compareAndSet(false, true)) {
-                doStart();
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component started");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling start", e);
-        }
-    }
-
-    protected void doStart() throws Exception {
-        synchronized (this.polling) {
-            workManager.startWork(new Work() {
-                public void release() { }
-                public void run() {
-                    poller = Thread.currentThread();
-                    pollDeliveryChannel();
-                }
-            });
-            polling.wait();
-        }
-    }
-    
-    protected void pollDeliveryChannel() {
-        synchronized (polling) {
-            polling.set(true);
-            polling.notify();
-        }
-        while (running.get()) {
-            try {
-                final MessageExchange exchange = channel.accept(1000L);
-                if (exchange != null) {
-                    final Transaction tx = (Transaction) 
exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
-                    if (tx != null) {
-                        if (transactionManager == null) {
-                            throw new IllegalStateException("Exchange is 
enlisted in a transaction, but no transaction manager is available");
-                        }
-                        transactionManager.suspend();
-                    }
-                    workManager.scheduleWork(new Work() {
-                        public void release() {
-                        }
-                        public void run() {
-                            processExchangeInTx(exchange, tx);
-                        }
-                    });
-                }
-            } catch (Throwable t) {
-                if (running.get() == false) {
-                    // Should have been interrupted, discard the throwable
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Polling thread will stop");
-                    }
-                } else {
-                    logger.error("Error polling delivery channel", t);
-                }
-            }
-        }
-        synchronized (polling) {
-            polling.set(false);
-            polling.notify();
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see javax.jbi.component.ComponentLifeCycle#stop()
-     */
-    public void stop() throws JBIException {
-        try {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Stopping component");
-            }
-            if (this.running.compareAndSet(true, false)) {
-                doStop();
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Component stopped");
-            }
-        } catch (JBIException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new JBIException("Error calling stop", e);
-        }
-    }
-
-    protected void doStop() throws Exception {
-        // Interrupt the polling thread and await termination
-        try {
-            synchronized (polling) {
-                if (polling.get()) {
-                    poller.interrupt();
-                    polling.wait();
-                }
-            }
-        } finally {
-            poller = null;
-        }
-    }
-
-    /**
-     * @return Returns the context.
-     */
-    public ComponentContext getContext() {
-        return context;
-    }
-
-    public WorkManager getWorkManager() {
-        return workManager;
-    }
-
-    protected WorkManager createWorkManager() {
-        // Create a very simple one
-        return new BasicWorkManager();
-    }
-
-    protected WorkManager findWorkManager() {
-        // If inside ServiceMix, retrieve its work manager
-        try {
-            Method getContainerMth = 
context.getClass().getMethod("getContainer", new Class[0]);
-            Object container = getContainerMth.invoke(context, new Object[0]);
-            Method getWorkManagerMth = 
container.getClass().getMethod("getWorkManager", new Class[0]);
-            return (WorkManager) getWorkManagerMth.invoke(container, new 
Object[0]);
-        } catch (Throwable t) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("JBI container is not ServiceMix. Will create our 
own WorkManager", t);
-            }
-        }
-        // TODO: should look in jndi for an existing work manager
-        return null;
-    }
-    
-    protected void processExchangeInTx(MessageExchange exchange, Transaction 
tx) {
-        try {
-            if (tx != null) {
-                transactionManager.resume(tx);
-            }
-            processExchange(exchange);
-        } catch (Exception e) {
-            logger.error("Error processing exchange " + exchange, e);
-            try {
-                // If we are transacted, check if this exception should
-                // rollback the transaction
-                if (transactionManager != null && 
-                    transactionManager.getStatus() == Status.STATUS_ACTIVE && 
-                    exceptionShouldRollbackTx(e)) {
-                    transactionManager.setRollbackOnly();
-                }
-                exchange.setError(e);
-                channel.send(exchange);
-            } catch (Exception inner) {
-                logger.error("Error setting exchange status to ERROR", inner);
-            }
-        } finally {
-            try {
-                // Check transaction status
-                if (tx != null) {
-                    int status = transactionManager.getStatus();
-                    // We use pull delivery, so the transaction should already
-                    // have been transfered to another thread because the 
component
-                    // must have answered.
-                    if (status != Status.STATUS_NO_TRANSACTION) {
-                        logger.error("Transaction is still active after 
exchange processing. Trying to rollback transaction.");
-                        try {
-                            transactionManager.rollback();
-                        } catch (Throwable t) {
-                            logger.error("Error trying to rollback 
transaction.", t);
-                        }
-                    }
-                }
-            } catch (Throwable t) {
-                logger.error("Error checking transaction status.", t);
-            }
-        }
-    }
-    
-    protected boolean exceptionShouldRollbackTx(Exception e) {
-        return false;
-    }
-    
-    public void processExchange(MessageExchange exchange) throws Exception {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Received exchange: status: " + exchange.getStatus() 
+ ", role: " + 
-                                (exchange.getRole() == Role.CONSUMER ? 
"consumer" : "provider"));
-        }
-        if (exchange.getRole() == Role.PROVIDER) {
-            boolean dynamic = false;
-            ServiceEndpoint endpoint = exchange.getEndpoint();
-            String key = EndpointSupport.getKey(exchange.getEndpoint());
-            Endpoint ep = (Endpoint) 
this.component.getRegistry().getEndpoint(key);
-            if (ep == null) {
-                if (endpoint.getServiceName().equals(getEPRServiceName())) {
-                    ep = getResolvedEPR(exchange.getEndpoint());
-                    dynamic = true;
-                } 
-                if (ep == null) {
-                    throw new IllegalStateException("Endpoint not found: " + 
key);
-                }
-            }
-            ExchangeProcessor processor = ep.getProcessor();
-            if (processor == null) {
-                throw new IllegalStateException("No processor found for 
endpoint: " + key);
-            }
-            try {
-                processor.process(exchange);
-            } finally {
-                // If the endpoint is dynamic, deactivate it
-                if (dynamic) {
-                    ep.deactivate();
-                }
-            }
-        } else {
-            ExchangeProcessor processor = null;
-            if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
-                String key = 
exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
-                Endpoint ep = (Endpoint) 
this.component.getRegistry().getEndpoint(key);
-                if (ep != null) {
-                    processor = ep.getProcessor();
-                }
-            } else {
-                processor = (ExchangeProcessor) 
processors.remove(exchange.getExchangeId());
-            }
-            if (processor == null) {
-                throw new IllegalStateException("No processor found for: " + 
exchange.getExchangeId());
-            }
-            processor.process(exchange);
-        }
-    }
-
-    /**
-     * 
-     * @param exchange
-     * @param processor
-     * @throws MessagingException
-     * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
-     */
-    public void sendConsumerExchange(MessageExchange exchange, 
ExchangeProcessor processor) throws MessagingException {
-        // If the exchange is not ACTIVE, no answer is expected
-        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            processors.put(exchange.getExchangeId(), processor);
-        }
-        channel.send(exchange);
-    }
-    
-    /**
-     * This method allows the component to keep no state in memory so that
-     * components can be clustered and provide fail-over and load-balancing.
-     * @param exchange
-     * @param endpoint
-     * @throws MessagingException
-     */
-    public void sendConsumerExchange(MessageExchange exchange, Endpoint 
endpoint) throws MessagingException {
-        String key = EndpointSupport.getKey(endpoint);
-        exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
-        channel.send(exchange);
-    }
-    
-    /**
-     * Handle an exchange sent to an EPR resolved by this component
-     * @param exchange
-     * @return an endpoint to use for handling the exchange
-     * @throws Exception
-     */
-    protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
-        throw new UnsupportedOperationException("Component does not handle EPR 
exchanges");
-    }
+       protected final transient Log logger;
+
+       protected BaseComponent component;
+
+       protected ComponentContext context;
+
+       protected ObjectName mbeanName;
+
+       protected WorkManager workManager;
+
+       protected AtomicBoolean running;
+
+       protected DeliveryChannel channel;
+
+       protected Thread poller;
+
+       protected AtomicBoolean polling;
+
+       protected TransactionManager transactionManager;
+
+       protected boolean workManagerCreated;
+
+       protected Map processors = new ConcurrentHashMap();
+
+       public AsyncBaseLifeCycle(BaseComponent component) {
+               this.component = component;
+               this.logger = component.logger;
+               this.running = new AtomicBoolean(false);
+               this.polling = new AtomicBoolean(false);
+               this.processors = new ConcurrentHashMap();
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
+        */
+       public ObjectName getExtensionMBeanName() {
+               return mbeanName;
+       }
+
+       protected Object getExtensionMBean() throws Exception {
+               return null;
+       }
+
+       protected ObjectName createExtensionMBeanName() throws Exception {
+               return 
this.context.getMBeanNames().createCustomComponentMBeanName(
+                               "Configuration");
+       }
+
+       protected QName getEPRServiceName() {
+               return null;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see 
javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
+        */
+       public void init(ComponentContext context) throws JBIException {
+               try {
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Initializing component");
+                       }
+                       this.context = context;
+                       this.channel = context.getDeliveryChannel();
+                       try {
+                               this.transactionManager = (TransactionManager) 
context
+                                               .getTransactionManager();
+                       } catch (Throwable e) {
+                               // Ignore, this is just a safeguard against non 
compliant
+                               // JBI implementation which throws an exception 
instead of
+                               // return null
+                       }
+                       doInit();
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Component initialized");
+                       }
+               } catch (JBIException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new JBIException("Error calling init", e);
+               }
+       }
+
+       protected void doInit() throws Exception {
+               // Register extension mbean
+               Object mbean = getExtensionMBean();
+               if (mbean != null) {
+                       MBeanServer server = this.context.getMBeanServer();
+                       if (server == null) {
+                               // TODO: log a warning ?
+                               // throw new JBIException("null mBeanServer");
+                       } else {
+                               this.mbeanName = createExtensionMBeanName();
+                               if (server.isRegistered(this.mbeanName)) {
+                                       server.unregisterMBean(this.mbeanName);
+                               }
+                               server.registerMBean(mbean, this.mbeanName);
+                       }
+               }
+               // Obtain or create the work manager
+               // When using the WorkManager from ServiceMix,
+               // some class loader problems can appear when
+               // trying to uninstall the components.
+               // Some threads owned by the work manager have a
+               // security context referencing the component class loader
+               // so that every loaded classes are locked
+               // this.workManager = findWorkManager();
+               if (this.workManager == null) {
+                       this.workManagerCreated = true;
+                       this.workManager = createWorkManager();
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see javax.jbi.component.ComponentLifeCycle#shutDown()
+        */
+       public void shutDown() throws JBIException {
+               try {
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Shutting down component");
+                       }
+                       doShutDown();
+                       this.context = null;
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Component shut down");
+                       }
+               } catch (JBIException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new JBIException("Error calling shutdown", e);
+               }
+       }
+
+       protected void doShutDown() throws Exception {
+               // Unregister mbean
+               if (this.mbeanName != null) {
+                       MBeanServer server = this.context.getMBeanServer();
+                       if (server == null) {
+                               throw new JBIException("null mBeanServer");
+                       }
+                       if (server.isRegistered(this.mbeanName)) {
+                               server.unregisterMBean(this.mbeanName);
+                       }
+               }
+               // Destroy work manager, if created
+               if (this.workManagerCreated) {
+                       if (this.workManager instanceof BasicWorkManager) {
+                               ((BasicWorkManager) 
this.workManager).shutDown();
+                       }
+                       this.workManager = null;
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see javax.jbi.component.ComponentLifeCycle#start()
+        */
+       public void start() throws JBIException {
+               try {
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Starting component");
+                       }
+                       if (this.running.compareAndSet(false, true)) {
+                               doStart();
+                       }
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Component started");
+                       }
+               } catch (JBIException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new JBIException("Error calling start", e);
+               }
+       }
+
+       protected void doStart() throws Exception {
+               synchronized (this.polling) {
+                       workManager.startWork(new Work() {
+                               public void release() {
+                               }
+
+                               public void run() {
+                                       poller = Thread.currentThread();
+                                       pollDeliveryChannel();
+                               }
+                       });
+                       polling.wait();
+               }
+       }
+
+       protected void pollDeliveryChannel() {
+               synchronized (polling) {
+                       polling.set(true);
+                       polling.notify();
+               }
+               while (running.get()) {
+                       try {
+                               final MessageExchange exchange = 
channel.accept(1000L);
+                               if (exchange != null) {
+                                       final Transaction tx = (Transaction) 
exchange
+                                                       
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+                                       if (tx != null) {
+                                               if (transactionManager == null) 
{
+                                                       throw new 
IllegalStateException(
+                                                                       
"Exchange is enlisted in a transaction, but no transaction manager is 
available");
+                                               }
+                                               transactionManager.suspend();
+                                       }
+                                       workManager.scheduleWork(new Work() {
+                                               public void release() {
+                                               }
+
+                                               public void run() {
+                                                       
processExchangeInTx(exchange, tx);
+                                               }
+                                       });
+                               }
+                       } catch (Throwable t) {
+                               if (running.get() == false) {
+                                       // Should have been interrupted, 
discard the throwable
+                                       if (logger.isDebugEnabled()) {
+                                               logger.debug("Polling thread 
will stop");
+                                       }
+                               } else {
+                                       logger.error("Error polling delivery 
channel", t);
+                               }
+                       }
+               }
+               synchronized (polling) {
+                       polling.set(false);
+                       polling.notify();
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see javax.jbi.component.ComponentLifeCycle#stop()
+        */
+       public void stop() throws JBIException {
+               try {
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Stopping component");
+                       }
+                       if (this.running.compareAndSet(true, false)) {
+                               doStop();
+                       }
+                       if (logger.isDebugEnabled()) {
+                               logger.debug("Component stopped");
+                       }
+               } catch (JBIException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new JBIException("Error calling stop", e);
+               }
+       }
+
+       protected void doStop() throws Exception {
+               // Interrupt the polling thread and await termination
+               try {
+                       synchronized (polling) {
+                               if (polling.get()) {
+                                       poller.interrupt();
+                                       polling.wait();
+                               }
+                       }
+               } finally {
+                       poller = null;
+               }
+       }
+
+       /**
+        * @return Returns the context.
+        */
+       public ComponentContext getContext() {
+               return context;
+       }
+
+       public WorkManager getWorkManager() {
+               return workManager;
+       }
+
+       protected WorkManager createWorkManager() {
+               // Create a very simple one
+               return new BasicWorkManager();
+       }
+
+       protected WorkManager findWorkManager() {
+               // If inside ServiceMix, retrieve its work manager
+               try {
+                       Method getContainerMth = context.getClass().getMethod(
+                                       "getContainer", new Class[0]);
+                       Object container = getContainerMth.invoke(context, new 
Object[0]);
+                       Method getWorkManagerMth = 
container.getClass().getMethod(
+                                       "getWorkManager", new Class[0]);
+                       return (WorkManager) getWorkManagerMth.invoke(container,
+                                       new Object[0]);
+               } catch (Throwable t) {
+                       if (logger.isDebugEnabled()) {
+                               logger
+                                               .debug(
+                                                               "JBI container 
is not ServiceMix. Will create our own WorkManager",
+                                                               t);
+                       }
+               }
+               // TODO: should look in jndi for an existing work manager
+               return null;
+       }
+
+       protected void processExchangeInTx(MessageExchange exchange, 
Transaction tx) {
+               try {
+                       if (tx != null) {
+                               transactionManager.resume(tx);
+                       }
+                       processExchange(exchange);
+               } catch (Exception e) {
+                       logger.error("Error processing exchange " + exchange, 
e);
+                       try {
+                               // If we are transacted, check if this 
exception should
+                               // rollback the transaction
+                               if (transactionManager != null
+                                               && 
transactionManager.getStatus() == Status.STATUS_ACTIVE
+                                               && 
exceptionShouldRollbackTx(e)) {
+                                       transactionManager.setRollbackOnly();
+                               }
+                               exchange.setError(e);
+                               channel.send(exchange);
+                       } catch (Exception inner) {
+                               logger.error("Error setting exchange status to 
ERROR", inner);
+                       }
+               } finally {
+                       try {
+                               // Check transaction status
+                               if (tx != null) {
+                                       int status = 
transactionManager.getStatus();
+                                       // We use pull delivery, so the 
transaction should already
+                                       // have been transfered to another 
thread because the
+                                       // component
+                                       // must have answered.
+                                       if (status != 
Status.STATUS_NO_TRANSACTION) {
+                                               logger
+                                                               
.error("Transaction is still active after exchange processing. Trying to 
rollback transaction.");
+                                               try {
+                                                       
transactionManager.rollback();
+                                               } catch (Throwable t) {
+                                                       logger.error(
+                                                                       "Error 
trying to rollback transaction.", t);
+                                               }
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               logger.error("Error checking transaction 
status.", t);
+                       }
+               }
+       }
+
+       protected boolean exceptionShouldRollbackTx(Exception e) {
+               return false;
+       }
+
+       protected void processExchange(MessageExchange exchange) throws 
Exception {
+               if (logger.isDebugEnabled()) {
+                       logger.debug("Received exchange: status: "
+                                       + exchange.getStatus()
+                                       + ", role: "
+                                       + (exchange.getRole() == Role.CONSUMER 
? "consumer"
+                                                       : "provider"));
+               }
+               if (exchange.getRole() == Role.PROVIDER) {
+                       boolean dynamic = false;
+                       ServiceEndpoint endpoint = exchange.getEndpoint();
+                       String key = 
EndpointSupport.getKey(exchange.getEndpoint());
+                       Endpoint ep = (Endpoint) 
this.component.getRegistry().getEndpoint(
+                                       key);
+                       if (ep == null) {
+                               if 
(endpoint.getServiceName().equals(getEPRServiceName())) {
+                                       ep = 
getResolvedEPR(exchange.getEndpoint());
+                                       dynamic = true;
+                               }
+                               if (ep == null) {
+                                       throw new 
IllegalStateException("Endpoint not found: "
+                                                       + key);
+                               }
+                       }
+                       ExchangeProcessor processor = ep.getProcessor();
+                       if (processor == null) {
+                               throw new IllegalStateException(
+                                               "No processor found for 
endpoint: " + key);
+                       }
+                       try {
+                               doProcess(ep, processor, exchange);
+                       } finally {
+                               // If the endpoint is dynamic, deactivate it
+                               if (dynamic) {
+                                       ep.deactivate();
+                               }
+                       }
+               } else {
+                       ExchangeProcessor processor = null;
+                       Endpoint ep = null;
+                       if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) 
!= null) {
+                               String key = 
exchange.getProperty(JbiConstants.SENDER_ENDPOINT)
+                                               .toString();
+                               ep = (Endpoint) this.component.getRegistry()
+                                               .getEndpoint(key);
+                               if (ep != null) {
+                                       processor = ep.getProcessor();
+                               }
+                       } else {
+                               processor = (ExchangeProcessor) 
processors.remove(exchange
+                                               .getExchangeId());
+                       }
+                       if (processor == null) {
+                               throw new IllegalStateException("No processor 
found for: "
+                                               + exchange.getExchangeId());
+                       }
+                       doProcess(ep,processor,exchange);
+               }
+
+       }
+
+       /**
+        * Thin wrapper around the call to the processor to ensure that the 
Endpoints
+        * classloader is used where available
+        * 
+        */
+       private void doProcess(Endpoint ep, ExchangeProcessor processor,
+                       MessageExchange exchange) throws Exception {
+               ClassLoader oldCl = 
Thread.currentThread().getContextClassLoader();
+               try {
+                       if (ep.getServiceUnit().getConfigurationClassLoader() 
!= null) {
+                               ClassLoader classLoader = ep.getServiceUnit()
+                                               .getConfigurationClassLoader();
+                               
Thread.currentThread().setContextClassLoader(classLoader);
+                       }
+
+                       processor.process(exchange);
+               } finally {
+                       Thread.currentThread().setContextClassLoader(oldCl);
+               }
+
+       }
+
+       /**
+        * 
+        * @param exchange
+        * @param processor
+        * @throws MessagingException
+        * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) 
instead
+        */
+       public void sendConsumerExchange(MessageExchange exchange,
+                       ExchangeProcessor processor) throws MessagingException {
+               // If the exchange is not ACTIVE, no answer is expected
+               if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                       processors.put(exchange.getExchangeId(), processor);
+               }
+               channel.send(exchange);
+       }
+
+       /**
+        * This method allows the component to keep no state in memory so that
+        * components can be clustered and provide fail-over and load-balancing.
+        * 
+        * @param exchange
+        * @param endpoint
+        * @throws MessagingException
+        */
+       public void sendConsumerExchange(MessageExchange exchange, Endpoint 
endpoint)
+                       throws MessagingException {
+               String key = EndpointSupport.getKey(endpoint);
+               exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
+               channel.send(exchange);
+       }
+
+       /**
+        * Handle an exchange sent to an EPR resolved by this component
+        * 
+        * @param exchange
+        * @return an endpoint to use for handling the exchange
+        * @throws Exception
+        */
+       protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
+               throw new UnsupportedOperationException(
+                               "Component does not handle EPR exchanges");
+       }
 
 }

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java?view=diff&rev=448564&r1=448563&r2=448564
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
 Thu Sep 21 07:31:41 2006
@@ -26,114 +26,126 @@
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
 
+import org.apache.xbean.kernel.ServiceNotFoundException;
+
 public class ServiceUnit {
 
-    protected BaseComponent component;
-    protected String name;
-    protected String rootPath;
-    protected String status = LifeCycleMBean.SHUTDOWN;
-    protected Map endpoints = new HashMap();
-    
-    public ServiceUnit() {
-    }
-    
-    public ServiceUnit(BaseComponent component) {
-        this.component = component;
-    }
-    
-    public void start() throws Exception {
-        // Activate endpoints
-        List activated = new ArrayList();
-        try {
-            for (Iterator iter = getEndpoints().iterator(); iter.hasNext();) {
-                Endpoint endpoint = (Endpoint) iter.next();
-                endpoint.activate();
-                activated.add(endpoint);
-            }
-            this.status = LifeCycleMBean.STARTED;
-        } catch (Exception e) {
-            // Deactivate activated endpoints
-            for (Iterator iter = activated.iterator(); iter.hasNext();) {
-                try {
-                    Endpoint endpoint = (Endpoint) iter.next();
-                    endpoint.deactivate();
-                } catch (Exception e2) {
-                    // do nothing
-                }
-            }
-            throw e;
-        }
-    }
-    
-    public void stop() throws Exception {
-        this.status = LifeCycleMBean.STOPPED;
-        // Deactivate endpoints
-        Exception exception = null;
-        for (Iterator iter = getEndpoints().iterator(); iter.hasNext();) {
-            Endpoint endpoint = (Endpoint) iter.next();
-            try {
-                endpoint.deactivate();
-            } catch (Exception e) {
-                exception = e;
-            }
-        }
-        if (exception != null) {
-            throw exception;
-        }
-    }
-    
-    public void shutDown() throws JBIException {
-        this.status = LifeCycleMBean.SHUTDOWN;
-    }
-    
-    public String getCurrentState() {
-        return status;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getRootPath() {
-        return rootPath;
-    }
-
-    public void setRootPath(String rootPath) {
-        this.rootPath = rootPath;
-    }
-    
-    /**
-     * @return Returns the component.
-     */
-    public BaseComponent getComponent() {
-        return component;
-    }
-    
-    /**
-     * @param component The component to set.
-     */
-    public void setComponent(BaseComponent component) {
-        this.component = component;
-    }
-    
-    public Collection getEndpoints() {
-        return this.endpoints.values();
-    }
-    
-    public void addEndpoint(Endpoint endpoint) {
-        String key = EndpointSupport.getKey(endpoint);
-        if (this.endpoints.put(key, endpoint) != null) {
-            throw new IllegalStateException("More than one endpoint found in 
the SU for key: " + key);
-        }
-    }
-    
-    public Endpoint getEndpoint(String key) {
-        return (Endpoint) this.endpoints.get(key);
-    }
+       protected BaseComponent component;
+
+       protected String name;
+
+       protected String rootPath;
+
+       protected String status = LifeCycleMBean.SHUTDOWN;
 
+       protected Map endpoints = new HashMap();
+
+       public ServiceUnit() {
+       }
+
+       public ServiceUnit(BaseComponent component) {
+               this.component = component;
+       }
+
+       public void start() throws Exception {
+               // Activate endpoints
+               List activated = new ArrayList();
+               try {
+                       for (Iterator iter = getEndpoints().iterator(); 
iter.hasNext();) {
+                               Endpoint endpoint = (Endpoint) iter.next();
+                               endpoint.activate();
+                               activated.add(endpoint);
+                       }
+                       this.status = LifeCycleMBean.STARTED;
+               } catch (Exception e) {
+                       // Deactivate activated endpoints
+                       for (Iterator iter = activated.iterator(); 
iter.hasNext();) {
+                               try {
+                                       Endpoint endpoint = (Endpoint) 
iter.next();
+                                       endpoint.deactivate();
+                               } catch (Exception e2) {
+                                       // do nothing
+                               }
+                       }
+                       throw e;
+               }
+       }
+
+       public void stop() throws Exception {
+               this.status = LifeCycleMBean.STOPPED;
+               // Deactivate endpoints
+               Exception exception = null;
+               for (Iterator iter = getEndpoints().iterator(); 
iter.hasNext();) {
+                       Endpoint endpoint = (Endpoint) iter.next();
+                       try {
+                               endpoint.deactivate();
+                       } catch (Exception e) {
+                               exception = e;
+                       }
+               }
+               if (exception != null) {
+                       throw exception;
+               }
+       }
+
+       public void shutDown() throws JBIException {
+               this.status = LifeCycleMBean.SHUTDOWN;
+       }
+
+       public String getCurrentState() {
+               return status;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public void setName(String name) {
+               this.name = name;
+       }
+
+       public String getRootPath() {
+               return rootPath;
+       }
+
+       public void setRootPath(String rootPath) {
+               this.rootPath = rootPath;
+       }
+
+       /**
+        * @return Returns the component.
+        */
+       public BaseComponent getComponent() {
+               return component;
+       }
+
+       /**
+        * @param component
+        *            The component to set.
+        */
+       public void setComponent(BaseComponent component) {
+               this.component = component;
+       }
+
+       public Collection getEndpoints() {
+               return this.endpoints.values();
+       }
+
+       public void addEndpoint(Endpoint endpoint) {
+               String key = EndpointSupport.getKey(endpoint);
+               if (this.endpoints.put(key, endpoint) != null) {
+                       throw new IllegalStateException(
+                                       "More than one endpoint found in the SU 
for key: " + key);
+               }
+       }
+
+       public Endpoint getEndpoint(String key) {
+               return (Endpoint) this.endpoints.get(key);
+       }
+
+       public ClassLoader getConfigurationClassLoader()
+                       throws ServiceNotFoundException {
+               return null;
+       }
 
 }


Reply via email to