Author: pdodds
Date: Thu Sep 21 07:31:41 2006
New Revision: 448564
URL: http://svn.apache.org/viewvc?view=rev&rev=448564
Log:
Refactored to add the support for the handling of Classloaders from within the
ServiceUnit, if a ServiceUnit implementation is going to provide Java classes
then it should return a classloader for that SU (see SM-591).
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?view=diff&rev=448564&r1=448563&r2=448564
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
Thu Sep 21 07:31:41 2006
@@ -44,8 +44,8 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
- * Base class for life cycle management of components.
- * This class may be used as is.
+ * Base class for life cycle management of components. This class may be used
as
+ * is.
*
* @author Guillaume Nodet
* @version $Revision: 399873 $
@@ -53,421 +53,495 @@
*/
public class AsyncBaseLifeCycle implements ComponentLifeCycle {
- protected final transient Log logger;
-
- protected BaseComponent component;
- protected ComponentContext context;
- protected ObjectName mbeanName;
- protected WorkManager workManager;
- protected AtomicBoolean running;
- protected DeliveryChannel channel;
- protected Thread poller;
- protected AtomicBoolean polling;
- protected TransactionManager transactionManager;
- protected boolean workManagerCreated;
- protected Map processors = new ConcurrentHashMap();
-
-
- public AsyncBaseLifeCycle(BaseComponent component) {
- this.component = component;
- this.logger = component.logger;
- this.running = new AtomicBoolean(false);
- this.polling = new AtomicBoolean(false);
- this.processors = new ConcurrentHashMap();
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
- */
- public ObjectName getExtensionMBeanName() {
- return mbeanName;
- }
-
- protected Object getExtensionMBean() throws Exception {
- return null;
- }
-
- protected ObjectName createExtensionMBeanName() throws Exception {
- return
this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
- }
-
- protected QName getEPRServiceName() {
- return null;
- }
-
- /* (non-Javadoc)
- * @see
javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
- */
- public void init(ComponentContext context) throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Initializing component");
- }
- this.context = context;
- this.channel = context.getDeliveryChannel();
- try {
- this.transactionManager = (TransactionManager)
context.getTransactionManager();
- } catch (Throwable e) {
- // Ignore, this is just a safeguard against non compliant
- // JBI implementation which throws an exception instead of
- // return null
- }
- doInit();
- if (logger.isDebugEnabled()) {
- logger.debug("Component initialized");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling init", e);
- }
- }
-
- protected void doInit() throws Exception {
- // Register extension mbean
- Object mbean = getExtensionMBean();
- if (mbean != null) {
- MBeanServer server = this.context.getMBeanServer();
- if (server == null) {
- // TODO: log a warning ?
- //throw new JBIException("null mBeanServer");
- } else {
- this.mbeanName = createExtensionMBeanName();
- if (server.isRegistered(this.mbeanName)) {
- server.unregisterMBean(this.mbeanName);
- }
- server.registerMBean(mbean, this.mbeanName);
- }
- }
- // Obtain or create the work manager
- // When using the WorkManager from ServiceMix,
- // some class loader problems can appear when
- // trying to uninstall the components.
- // Some threads owned by the work manager have a
- // security context referencing the component class loader
- // so that every loaded classes are locked
- //this.workManager = findWorkManager();
- if (this.workManager == null) {
- this.workManagerCreated = true;
- this.workManager = createWorkManager();
- }
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#shutDown()
- */
- public void shutDown() throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Shutting down component");
- }
- doShutDown();
- this.context = null;
- if (logger.isDebugEnabled()) {
- logger.debug("Component shut down");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling shutdown", e);
- }
- }
-
- protected void doShutDown() throws Exception {
- // Unregister mbean
- if (this.mbeanName != null) {
- MBeanServer server = this.context.getMBeanServer();
- if (server == null) {
- throw new JBIException("null mBeanServer");
- }
- if (server.isRegistered(this.mbeanName)) {
- server.unregisterMBean(this.mbeanName);
- }
- }
- // Destroy work manager, if created
- if (this.workManagerCreated) {
- if (this.workManager instanceof BasicWorkManager) {
- ((BasicWorkManager) this.workManager).shutDown();
- }
- this.workManager = null;
- }
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#start()
- */
- public void start() throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Starting component");
- }
- if (this.running.compareAndSet(false, true)) {
- doStart();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Component started");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling start", e);
- }
- }
-
- protected void doStart() throws Exception {
- synchronized (this.polling) {
- workManager.startWork(new Work() {
- public void release() { }
- public void run() {
- poller = Thread.currentThread();
- pollDeliveryChannel();
- }
- });
- polling.wait();
- }
- }
-
- protected void pollDeliveryChannel() {
- synchronized (polling) {
- polling.set(true);
- polling.notify();
- }
- while (running.get()) {
- try {
- final MessageExchange exchange = channel.accept(1000L);
- if (exchange != null) {
- final Transaction tx = (Transaction)
exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
- if (tx != null) {
- if (transactionManager == null) {
- throw new IllegalStateException("Exchange is
enlisted in a transaction, but no transaction manager is available");
- }
- transactionManager.suspend();
- }
- workManager.scheduleWork(new Work() {
- public void release() {
- }
- public void run() {
- processExchangeInTx(exchange, tx);
- }
- });
- }
- } catch (Throwable t) {
- if (running.get() == false) {
- // Should have been interrupted, discard the throwable
- if (logger.isDebugEnabled()) {
- logger.debug("Polling thread will stop");
- }
- } else {
- logger.error("Error polling delivery channel", t);
- }
- }
- }
- synchronized (polling) {
- polling.set(false);
- polling.notify();
- }
- }
-
- /* (non-Javadoc)
- * @see javax.jbi.component.ComponentLifeCycle#stop()
- */
- public void stop() throws JBIException {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Stopping component");
- }
- if (this.running.compareAndSet(true, false)) {
- doStop();
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Component stopped");
- }
- } catch (JBIException e) {
- throw e;
- } catch (Exception e) {
- throw new JBIException("Error calling stop", e);
- }
- }
-
- protected void doStop() throws Exception {
- // Interrupt the polling thread and await termination
- try {
- synchronized (polling) {
- if (polling.get()) {
- poller.interrupt();
- polling.wait();
- }
- }
- } finally {
- poller = null;
- }
- }
-
- /**
- * @return Returns the context.
- */
- public ComponentContext getContext() {
- return context;
- }
-
- public WorkManager getWorkManager() {
- return workManager;
- }
-
- protected WorkManager createWorkManager() {
- // Create a very simple one
- return new BasicWorkManager();
- }
-
- protected WorkManager findWorkManager() {
- // If inside ServiceMix, retrieve its work manager
- try {
- Method getContainerMth =
context.getClass().getMethod("getContainer", new Class[0]);
- Object container = getContainerMth.invoke(context, new Object[0]);
- Method getWorkManagerMth =
container.getClass().getMethod("getWorkManager", new Class[0]);
- return (WorkManager) getWorkManagerMth.invoke(container, new
Object[0]);
- } catch (Throwable t) {
- if (logger.isDebugEnabled()) {
- logger.debug("JBI container is not ServiceMix. Will create our
own WorkManager", t);
- }
- }
- // TODO: should look in jndi for an existing work manager
- return null;
- }
-
- protected void processExchangeInTx(MessageExchange exchange, Transaction
tx) {
- try {
- if (tx != null) {
- transactionManager.resume(tx);
- }
- processExchange(exchange);
- } catch (Exception e) {
- logger.error("Error processing exchange " + exchange, e);
- try {
- // If we are transacted, check if this exception should
- // rollback the transaction
- if (transactionManager != null &&
- transactionManager.getStatus() == Status.STATUS_ACTIVE &&
- exceptionShouldRollbackTx(e)) {
- transactionManager.setRollbackOnly();
- }
- exchange.setError(e);
- channel.send(exchange);
- } catch (Exception inner) {
- logger.error("Error setting exchange status to ERROR", inner);
- }
- } finally {
- try {
- // Check transaction status
- if (tx != null) {
- int status = transactionManager.getStatus();
- // We use pull delivery, so the transaction should already
- // have been transfered to another thread because the
component
- // must have answered.
- if (status != Status.STATUS_NO_TRANSACTION) {
- logger.error("Transaction is still active after
exchange processing. Trying to rollback transaction.");
- try {
- transactionManager.rollback();
- } catch (Throwable t) {
- logger.error("Error trying to rollback
transaction.", t);
- }
- }
- }
- } catch (Throwable t) {
- logger.error("Error checking transaction status.", t);
- }
- }
- }
-
- protected boolean exceptionShouldRollbackTx(Exception e) {
- return false;
- }
-
- public void processExchange(MessageExchange exchange) throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("Received exchange: status: " + exchange.getStatus()
+ ", role: " +
- (exchange.getRole() == Role.CONSUMER ?
"consumer" : "provider"));
- }
- if (exchange.getRole() == Role.PROVIDER) {
- boolean dynamic = false;
- ServiceEndpoint endpoint = exchange.getEndpoint();
- String key = EndpointSupport.getKey(exchange.getEndpoint());
- Endpoint ep = (Endpoint)
this.component.getRegistry().getEndpoint(key);
- if (ep == null) {
- if (endpoint.getServiceName().equals(getEPRServiceName())) {
- ep = getResolvedEPR(exchange.getEndpoint());
- dynamic = true;
- }
- if (ep == null) {
- throw new IllegalStateException("Endpoint not found: " +
key);
- }
- }
- ExchangeProcessor processor = ep.getProcessor();
- if (processor == null) {
- throw new IllegalStateException("No processor found for
endpoint: " + key);
- }
- try {
- processor.process(exchange);
- } finally {
- // If the endpoint is dynamic, deactivate it
- if (dynamic) {
- ep.deactivate();
- }
- }
- } else {
- ExchangeProcessor processor = null;
- if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
- String key =
exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
- Endpoint ep = (Endpoint)
this.component.getRegistry().getEndpoint(key);
- if (ep != null) {
- processor = ep.getProcessor();
- }
- } else {
- processor = (ExchangeProcessor)
processors.remove(exchange.getExchangeId());
- }
- if (processor == null) {
- throw new IllegalStateException("No processor found for: " +
exchange.getExchangeId());
- }
- processor.process(exchange);
- }
- }
-
- /**
- *
- * @param exchange
- * @param processor
- * @throws MessagingException
- * @deprecated use sendConsumerExchange(MessageExchange, Endpoint) instead
- */
- public void sendConsumerExchange(MessageExchange exchange,
ExchangeProcessor processor) throws MessagingException {
- // If the exchange is not ACTIVE, no answer is expected
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- processors.put(exchange.getExchangeId(), processor);
- }
- channel.send(exchange);
- }
-
- /**
- * This method allows the component to keep no state in memory so that
- * components can be clustered and provide fail-over and load-balancing.
- * @param exchange
- * @param endpoint
- * @throws MessagingException
- */
- public void sendConsumerExchange(MessageExchange exchange, Endpoint
endpoint) throws MessagingException {
- String key = EndpointSupport.getKey(endpoint);
- exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
- channel.send(exchange);
- }
-
- /**
- * Handle an exchange sent to an EPR resolved by this component
- * @param exchange
- * @return an endpoint to use for handling the exchange
- * @throws Exception
- */
- protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
- throw new UnsupportedOperationException("Component does not handle EPR
exchanges");
- }
+ protected final transient Log logger;
+
+ protected BaseComponent component;
+
+ protected ComponentContext context;
+
+ protected ObjectName mbeanName;
+
+ protected WorkManager workManager;
+
+ protected AtomicBoolean running;
+
+ protected DeliveryChannel channel;
+
+ protected Thread poller;
+
+ protected AtomicBoolean polling;
+
+ protected TransactionManager transactionManager;
+
+ protected boolean workManagerCreated;
+
+ protected Map processors = new ConcurrentHashMap();
+
+ public AsyncBaseLifeCycle(BaseComponent component) {
+ this.component = component;
+ this.logger = component.logger;
+ this.running = new AtomicBoolean(false);
+ this.polling = new AtomicBoolean(false);
+ this.processors = new ConcurrentHashMap();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
+ */
+ public ObjectName getExtensionMBeanName() {
+ return mbeanName;
+ }
+
+ protected Object getExtensionMBean() throws Exception {
+ return null;
+ }
+
+ protected ObjectName createExtensionMBeanName() throws Exception {
+ return
this.context.getMBeanNames().createCustomComponentMBeanName(
+ "Configuration");
+ }
+
+ protected QName getEPRServiceName() {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
+ */
+ public void init(ComponentContext context) throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initializing component");
+ }
+ this.context = context;
+ this.channel = context.getDeliveryChannel();
+ try {
+ this.transactionManager = (TransactionManager)
context
+ .getTransactionManager();
+ } catch (Throwable e) {
+ // Ignore, this is just a safeguard against non
compliant
+ // JBI implementation which throws an exception
instead of
+ // return null
+ }
+ doInit();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component initialized");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling init", e);
+ }
+ }
+
+ protected void doInit() throws Exception {
+ // Register extension mbean
+ Object mbean = getExtensionMBean();
+ if (mbean != null) {
+ MBeanServer server = this.context.getMBeanServer();
+ if (server == null) {
+ // TODO: log a warning ?
+ // throw new JBIException("null mBeanServer");
+ } else {
+ this.mbeanName = createExtensionMBeanName();
+ if (server.isRegistered(this.mbeanName)) {
+ server.unregisterMBean(this.mbeanName);
+ }
+ server.registerMBean(mbean, this.mbeanName);
+ }
+ }
+ // Obtain or create the work manager
+ // When using the WorkManager from ServiceMix,
+ // some class loader problems can appear when
+ // trying to uninstall the components.
+ // Some threads owned by the work manager have a
+ // security context referencing the component class loader
+ // so that every loaded classes are locked
+ // this.workManager = findWorkManager();
+ if (this.workManager == null) {
+ this.workManagerCreated = true;
+ this.workManager = createWorkManager();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jbi.component.ComponentLifeCycle#shutDown()
+ */
+ public void shutDown() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shutting down component");
+ }
+ doShutDown();
+ this.context = null;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component shut down");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling shutdown", e);
+ }
+ }
+
+ protected void doShutDown() throws Exception {
+ // Unregister mbean
+ if (this.mbeanName != null) {
+ MBeanServer server = this.context.getMBeanServer();
+ if (server == null) {
+ throw new JBIException("null mBeanServer");
+ }
+ if (server.isRegistered(this.mbeanName)) {
+ server.unregisterMBean(this.mbeanName);
+ }
+ }
+ // Destroy work manager, if created
+ if (this.workManagerCreated) {
+ if (this.workManager instanceof BasicWorkManager) {
+ ((BasicWorkManager)
this.workManager).shutDown();
+ }
+ this.workManager = null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jbi.component.ComponentLifeCycle#start()
+ */
+ public void start() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting component");
+ }
+ if (this.running.compareAndSet(false, true)) {
+ doStart();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component started");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling start", e);
+ }
+ }
+
+ protected void doStart() throws Exception {
+ synchronized (this.polling) {
+ workManager.startWork(new Work() {
+ public void release() {
+ }
+
+ public void run() {
+ poller = Thread.currentThread();
+ pollDeliveryChannel();
+ }
+ });
+ polling.wait();
+ }
+ }
+
+ protected void pollDeliveryChannel() {
+ synchronized (polling) {
+ polling.set(true);
+ polling.notify();
+ }
+ while (running.get()) {
+ try {
+ final MessageExchange exchange =
channel.accept(1000L);
+ if (exchange != null) {
+ final Transaction tx = (Transaction)
exchange
+
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
+ if (tx != null) {
+ if (transactionManager == null)
{
+ throw new
IllegalStateException(
+
"Exchange is enlisted in a transaction, but no transaction manager is
available");
+ }
+ transactionManager.suspend();
+ }
+ workManager.scheduleWork(new Work() {
+ public void release() {
+ }
+
+ public void run() {
+
processExchangeInTx(exchange, tx);
+ }
+ });
+ }
+ } catch (Throwable t) {
+ if (running.get() == false) {
+ // Should have been interrupted,
discard the throwable
+ if (logger.isDebugEnabled()) {
+ logger.debug("Polling thread
will stop");
+ }
+ } else {
+ logger.error("Error polling delivery
channel", t);
+ }
+ }
+ }
+ synchronized (polling) {
+ polling.set(false);
+ polling.notify();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jbi.component.ComponentLifeCycle#stop()
+ */
+ public void stop() throws JBIException {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping component");
+ }
+ if (this.running.compareAndSet(true, false)) {
+ doStop();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Component stopped");
+ }
+ } catch (JBIException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new JBIException("Error calling stop", e);
+ }
+ }
+
+ protected void doStop() throws Exception {
+ // Interrupt the polling thread and await termination
+ try {
+ synchronized (polling) {
+ if (polling.get()) {
+ poller.interrupt();
+ polling.wait();
+ }
+ }
+ } finally {
+ poller = null;
+ }
+ }
+
+ /**
+ * @return Returns the context.
+ */
+ public ComponentContext getContext() {
+ return context;
+ }
+
+ public WorkManager getWorkManager() {
+ return workManager;
+ }
+
+ protected WorkManager createWorkManager() {
+ // Create a very simple one
+ return new BasicWorkManager();
+ }
+
+ protected WorkManager findWorkManager() {
+ // If inside ServiceMix, retrieve its work manager
+ try {
+ Method getContainerMth = context.getClass().getMethod(
+ "getContainer", new Class[0]);
+ Object container = getContainerMth.invoke(context, new
Object[0]);
+ Method getWorkManagerMth =
container.getClass().getMethod(
+ "getWorkManager", new Class[0]);
+ return (WorkManager) getWorkManagerMth.invoke(container,
+ new Object[0]);
+ } catch (Throwable t) {
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug(
+ "JBI container
is not ServiceMix. Will create our own WorkManager",
+ t);
+ }
+ }
+ // TODO: should look in jndi for an existing work manager
+ return null;
+ }
+
+ protected void processExchangeInTx(MessageExchange exchange,
Transaction tx) {
+ try {
+ if (tx != null) {
+ transactionManager.resume(tx);
+ }
+ processExchange(exchange);
+ } catch (Exception e) {
+ logger.error("Error processing exchange " + exchange,
e);
+ try {
+ // If we are transacted, check if this
exception should
+ // rollback the transaction
+ if (transactionManager != null
+ &&
transactionManager.getStatus() == Status.STATUS_ACTIVE
+ &&
exceptionShouldRollbackTx(e)) {
+ transactionManager.setRollbackOnly();
+ }
+ exchange.setError(e);
+ channel.send(exchange);
+ } catch (Exception inner) {
+ logger.error("Error setting exchange status to
ERROR", inner);
+ }
+ } finally {
+ try {
+ // Check transaction status
+ if (tx != null) {
+ int status =
transactionManager.getStatus();
+ // We use pull delivery, so the
transaction should already
+ // have been transfered to another
thread because the
+ // component
+ // must have answered.
+ if (status !=
Status.STATUS_NO_TRANSACTION) {
+ logger
+
.error("Transaction is still active after exchange processing. Trying to
rollback transaction.");
+ try {
+
transactionManager.rollback();
+ } catch (Throwable t) {
+ logger.error(
+ "Error
trying to rollback transaction.", t);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Error checking transaction
status.", t);
+ }
+ }
+ }
+
+ protected boolean exceptionShouldRollbackTx(Exception e) {
+ return false;
+ }
+
+ protected void processExchange(MessageExchange exchange) throws
Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received exchange: status: "
+ + exchange.getStatus()
+ + ", role: "
+ + (exchange.getRole() == Role.CONSUMER
? "consumer"
+ : "provider"));
+ }
+ if (exchange.getRole() == Role.PROVIDER) {
+ boolean dynamic = false;
+ ServiceEndpoint endpoint = exchange.getEndpoint();
+ String key =
EndpointSupport.getKey(exchange.getEndpoint());
+ Endpoint ep = (Endpoint)
this.component.getRegistry().getEndpoint(
+ key);
+ if (ep == null) {
+ if
(endpoint.getServiceName().equals(getEPRServiceName())) {
+ ep =
getResolvedEPR(exchange.getEndpoint());
+ dynamic = true;
+ }
+ if (ep == null) {
+ throw new
IllegalStateException("Endpoint not found: "
+ + key);
+ }
+ }
+ ExchangeProcessor processor = ep.getProcessor();
+ if (processor == null) {
+ throw new IllegalStateException(
+ "No processor found for
endpoint: " + key);
+ }
+ try {
+ doProcess(ep, processor, exchange);
+ } finally {
+ // If the endpoint is dynamic, deactivate it
+ if (dynamic) {
+ ep.deactivate();
+ }
+ }
+ } else {
+ ExchangeProcessor processor = null;
+ Endpoint ep = null;
+ if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT)
!= null) {
+ String key =
exchange.getProperty(JbiConstants.SENDER_ENDPOINT)
+ .toString();
+ ep = (Endpoint) this.component.getRegistry()
+ .getEndpoint(key);
+ if (ep != null) {
+ processor = ep.getProcessor();
+ }
+ } else {
+ processor = (ExchangeProcessor)
processors.remove(exchange
+ .getExchangeId());
+ }
+ if (processor == null) {
+ throw new IllegalStateException("No processor
found for: "
+ + exchange.getExchangeId());
+ }
+ doProcess(ep,processor,exchange);
+ }
+
+ }
+
+ /**
+ * Thin wrapper around the call to the processor to ensure that the
Endpoints
+ * classloader is used where available
+ *
+ */
+ private void doProcess(Endpoint ep, ExchangeProcessor processor,
+ MessageExchange exchange) throws Exception {
+ ClassLoader oldCl =
Thread.currentThread().getContextClassLoader();
+ try {
+ if (ep.getServiceUnit().getConfigurationClassLoader()
!= null) {
+ ClassLoader classLoader = ep.getServiceUnit()
+ .getConfigurationClassLoader();
+
Thread.currentThread().setContextClassLoader(classLoader);
+ }
+
+ processor.process(exchange);
+ } finally {
+ Thread.currentThread().setContextClassLoader(oldCl);
+ }
+
+ }
+
+ /**
+ *
+ * @param exchange
+ * @param processor
+ * @throws MessagingException
+ * @deprecated use sendConsumerExchange(MessageExchange, Endpoint)
instead
+ */
+ public void sendConsumerExchange(MessageExchange exchange,
+ ExchangeProcessor processor) throws MessagingException {
+ // If the exchange is not ACTIVE, no answer is expected
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ processors.put(exchange.getExchangeId(), processor);
+ }
+ channel.send(exchange);
+ }
+
+ /**
+ * This method allows the component to keep no state in memory so that
+ * components can be clustered and provide fail-over and load-balancing.
+ *
+ * @param exchange
+ * @param endpoint
+ * @throws MessagingException
+ */
+ public void sendConsumerExchange(MessageExchange exchange, Endpoint
endpoint)
+ throws MessagingException {
+ String key = EndpointSupport.getKey(endpoint);
+ exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
+ channel.send(exchange);
+ }
+
+ /**
+ * Handle an exchange sent to an EPR resolved by this component
+ *
+ * @param exchange
+ * @return an endpoint to use for handling the exchange
+ * @throws Exception
+ */
+ protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
+ throw new UnsupportedOperationException(
+ "Component does not handle EPR exchanges");
+ }
}
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java?view=diff&rev=448564&r1=448563&r2=448564
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ServiceUnit.java
Thu Sep 21 07:31:41 2006
@@ -26,114 +26,126 @@
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
+import org.apache.xbean.kernel.ServiceNotFoundException;
+
public class ServiceUnit {
- protected BaseComponent component;
- protected String name;
- protected String rootPath;
- protected String status = LifeCycleMBean.SHUTDOWN;
- protected Map endpoints = new HashMap();
-
- public ServiceUnit() {
- }
-
- public ServiceUnit(BaseComponent component) {
- this.component = component;
- }
-
- public void start() throws Exception {
- // Activate endpoints
- List activated = new ArrayList();
- try {
- for (Iterator iter = getEndpoints().iterator(); iter.hasNext();) {
- Endpoint endpoint = (Endpoint) iter.next();
- endpoint.activate();
- activated.add(endpoint);
- }
- this.status = LifeCycleMBean.STARTED;
- } catch (Exception e) {
- // Deactivate activated endpoints
- for (Iterator iter = activated.iterator(); iter.hasNext();) {
- try {
- Endpoint endpoint = (Endpoint) iter.next();
- endpoint.deactivate();
- } catch (Exception e2) {
- // do nothing
- }
- }
- throw e;
- }
- }
-
- public void stop() throws Exception {
- this.status = LifeCycleMBean.STOPPED;
- // Deactivate endpoints
- Exception exception = null;
- for (Iterator iter = getEndpoints().iterator(); iter.hasNext();) {
- Endpoint endpoint = (Endpoint) iter.next();
- try {
- endpoint.deactivate();
- } catch (Exception e) {
- exception = e;
- }
- }
- if (exception != null) {
- throw exception;
- }
- }
-
- public void shutDown() throws JBIException {
- this.status = LifeCycleMBean.SHUTDOWN;
- }
-
- public String getCurrentState() {
- return status;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getRootPath() {
- return rootPath;
- }
-
- public void setRootPath(String rootPath) {
- this.rootPath = rootPath;
- }
-
- /**
- * @return Returns the component.
- */
- public BaseComponent getComponent() {
- return component;
- }
-
- /**
- * @param component The component to set.
- */
- public void setComponent(BaseComponent component) {
- this.component = component;
- }
-
- public Collection getEndpoints() {
- return this.endpoints.values();
- }
-
- public void addEndpoint(Endpoint endpoint) {
- String key = EndpointSupport.getKey(endpoint);
- if (this.endpoints.put(key, endpoint) != null) {
- throw new IllegalStateException("More than one endpoint found in
the SU for key: " + key);
- }
- }
-
- public Endpoint getEndpoint(String key) {
- return (Endpoint) this.endpoints.get(key);
- }
+ protected BaseComponent component;
+
+ protected String name;
+
+ protected String rootPath;
+
+ protected String status = LifeCycleMBean.SHUTDOWN;
+ protected Map endpoints = new HashMap();
+
+ public ServiceUnit() {
+ }
+
+ public ServiceUnit(BaseComponent component) {
+ this.component = component;
+ }
+
+ public void start() throws Exception {
+ // Activate endpoints
+ List activated = new ArrayList();
+ try {
+ for (Iterator iter = getEndpoints().iterator();
iter.hasNext();) {
+ Endpoint endpoint = (Endpoint) iter.next();
+ endpoint.activate();
+ activated.add(endpoint);
+ }
+ this.status = LifeCycleMBean.STARTED;
+ } catch (Exception e) {
+ // Deactivate activated endpoints
+ for (Iterator iter = activated.iterator();
iter.hasNext();) {
+ try {
+ Endpoint endpoint = (Endpoint)
iter.next();
+ endpoint.deactivate();
+ } catch (Exception e2) {
+ // do nothing
+ }
+ }
+ throw e;
+ }
+ }
+
+ public void stop() throws Exception {
+ this.status = LifeCycleMBean.STOPPED;
+ // Deactivate endpoints
+ Exception exception = null;
+ for (Iterator iter = getEndpoints().iterator();
iter.hasNext();) {
+ Endpoint endpoint = (Endpoint) iter.next();
+ try {
+ endpoint.deactivate();
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ public void shutDown() throws JBIException {
+ this.status = LifeCycleMBean.SHUTDOWN;
+ }
+
+ public String getCurrentState() {
+ return status;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getRootPath() {
+ return rootPath;
+ }
+
+ public void setRootPath(String rootPath) {
+ this.rootPath = rootPath;
+ }
+
+ /**
+ * @return Returns the component.
+ */
+ public BaseComponent getComponent() {
+ return component;
+ }
+
+ /**
+ * @param component
+ * The component to set.
+ */
+ public void setComponent(BaseComponent component) {
+ this.component = component;
+ }
+
+ public Collection getEndpoints() {
+ return this.endpoints.values();
+ }
+
+ public void addEndpoint(Endpoint endpoint) {
+ String key = EndpointSupport.getKey(endpoint);
+ if (this.endpoints.put(key, endpoint) != null) {
+ throw new IllegalStateException(
+ "More than one endpoint found in the SU
for key: " + key);
+ }
+ }
+
+ public Endpoint getEndpoint(String key) {
+ return (Endpoint) this.endpoints.get(key);
+ }
+
+ public ClassLoader getConfigurationClassLoader()
+ throws ServiceNotFoundException {
+ return null;
+ }
}