Modified: incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java?view=diff&rev=448019&r1=448018&r2=448019 ============================================================================== --- incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java (original) +++ incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java Tue Sep 19 17:42:16 2006 @@ -19,7 +19,7 @@ package org.apache.ode.daohib.bpel.hobj; /** - * @hibernate.subclass table="BPEL_CORRELATOR_ENTRY" discriminator-value="S" + * @hibernate.class table="BPEL_SELECTORS" * */ public class HCorrelatorSelector extends HCorrelatorEntry { @@ -30,6 +30,10 @@ private int _idx; + private HCorrelator _correlator; + + private String _correlationKey; + /** * @hibernate.many-to-one column="PIID" */ @@ -54,7 +58,7 @@ } /** - * @hibernate.property column="IDX" + * @hibernate.property column="IDX" not-null="true" */ public int getIndex() { return _idx; @@ -63,4 +67,37 @@ public void setIndex(int idx) { _idx = idx; } + + @Override + public String toString() { + return "{HCorrelatorSelector correlator=" + this.getCorrelator() + ", ckey=" + getCorrelationKey() + + ", groupId=" + getGroupId() + ", idx=" + getIndex() + ", iid=" + getInstance().getId() + "}"; + } + + /** + * @hibernate.property column="CORRELATION_KEY" + * @hibernate.column name="CORRELATION_KEY" + * index="IDX_SELECTOR_CKEY" + * + */ + public String getCorrelationKey() { + return _correlationKey; + } + + public void setCorrelationKey(String correlationKey) { + _correlationKey = correlationKey; + } + + /** + * @hibernate.many-to-one + * @hibernate.column name="CORRELATOR" index="IDX_SELECTOR_CORRELATOR" + */ + public HCorrelator getCorrelator() { + return _correlator; + } + + public void setCorrelator(HCorrelator correlator) { + _correlator = correlator; + } + }
Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?view=diff&rev=448019&r1=448018&r2=448019 ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java Tue Sep 19 17:42:16 2006 @@ -131,12 +131,6 @@ continuation = (Continuation)it.next(); it.remove(); } - // At this point it is wise to clone the continuation, so that we do not have weird - // concurrency issues. We only clone the closure, the arguments should not be - // a problem. -// Continuation clone = new Continuation(cloneClosure(continuation.getClosure()), continuation.getMethod(), continuation.getArgs()); -// clone.setDescription(continuation.getDescription()); -// return clone; return continuation; } Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=448019&r1=448018&r2=448019 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Tue Sep 19 17:42:16 2006 @@ -40,7 +40,7 @@ * type handles all communications initiated by ODE that is destined for other * JBI providers. */ -class OdeConsumer implements JbiMessageExchangeProcessor { +class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor { private static final Log __log = LogFactory.getLog(OdeConsumer.class); private OdeContext _ode; Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?view=diff&rev=448019&r1=448018&r2=448019 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java (original) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java Tue Sep 19 17:42:16 2006 @@ -32,6 +32,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.util.HashMap; +import java.util.Map; + +import javax.jbi.JBIException; +import javax.jbi.messaging.ExchangeStatus; +import javax.jbi.messaging.Fault; +import javax.jbi.messaging.InOnly; +import javax.jbi.messaging.InOut; +import javax.jbi.messaging.MessagingException; +import javax.jbi.messaging.NormalizedMessage; +import javax.jbi.servicedesc.ServiceEndpoint; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; +import org.apache.ode.bpel.iapi.MessageExchange.Status; +import org.apache.ode.jbi.msgmap.Mapper; +import org.apache.ode.jbi.msgmap.MessageTranslationException; import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; @@ -44,314 +65,309 @@ /** * Bridge JBI (consumer) to ODE (provider). */ -public class OdeService implements JbiMessageExchangeProcessor { +public class OdeService extends ServiceBridge implements JbiMessageExchangeProcessor { - private static final Log __log = LogFactory.getLog(OdeService.class); + private static final Log __log = LogFactory.getLog(OdeService.class); - /** utility for tracking outstanding JBI message exchanges. */ - private final JbiMexTracker _jbiMexTracker = new JbiMexTracker(); - - /** JBI-Generated Endpoint */ - private ServiceEndpoint _internal; - - /** External endpoint. */ - private ServiceEndpoint _external; - - private OdeContext _ode; - - private Element _serviceref; - - private Endpoint _endpoint; - - - public OdeService(OdeContext odeContext, Endpoint endpoint) - throws Exception { - _ode = odeContext; - _endpoint = endpoint; - } - - /** - * Do the JBI endpoint activation. - * - * @throws JBIException - */ - public void activate() throws JBIException { - if (_serviceref == null) { - ServiceEndpoint[] candidates = _ode.getContext().getExternalEndpointsForService(_endpoint.serviceName); - if (candidates.length != 0) { - _external = candidates[0]; - } - } - _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, _endpoint.portName); - if (__log.isDebugEnabled()) { - __log.debug("Activated endpoint " + _endpoint); - } - // TODO: Is there a race situation here? - } - - /** - * Deactivate endpoints in JBI. - */ - public void deactivate() throws JBIException { - _ode.getContext().deactivateEndpoint(_internal); - __log.debug("Dectivated endpoint " + _endpoint); - } - - public ServiceEndpoint getInternalServiceEndpoint() { - return _internal; - } - - public ServiceEndpoint getExternalServiceEndpoint() { - return _external; - } - - - public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException { - if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) { - String errmsg ="Message exchange is not in PROVIDER role as expected: " + jbiMex.getExchangeId(); - __log.fatal(errmsg); - throw new IllegalArgumentException(errmsg); - } - - if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) { - // We can forget about the exchange. - _jbiMexTracker.consume(jbiMex.getExchangeId()); - return; - } - - if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) { - boolean success = false; - Exception err = null; - try { - invokeOde(jbiMex, ((InOnly)jbiMex).getInMessage()); - success = true; - } catch (Exception ex) { - __log.error("Error invoking ODE.",ex); - err = ex; - } finally { - if (!success) { - jbiMex.setStatus(ExchangeStatus.ERROR); - jbiMex.setError(err); - } else { - jbiMex.setStatus(ExchangeStatus.DONE); - } - } - } else if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) { - boolean success = false; - Exception err = null; - try { - invokeOde(jbiMex, ((InOut)jbiMex).getInMessage()); - success = true; - } catch (Exception ex) { - __log.error("Error invoking ODE.", ex); - err = ex; - } finally { - if (!success) { - jbiMex.setError(err); - jbiMex.setStatus(ExchangeStatus.ERROR); - } - } - } else { - __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " - + jbiMex.getPattern()); - jbiMex.setStatus(ExchangeStatus.ERROR); - jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern())); - } - - } - - /** - * Called from [EMAIL PROTECTED] MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)} - * @param mex message exchenge - */ - public void onResponse(MyRoleMessageExchange mex) { - javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId()); - if (jbiMex == null) { - __log.warn("Ignoring unknown async reply: " + mex); - return; - } - - switch (mex.getStatus()) { - case FAULT: - outResponseFault(mex, jbiMex); - break; - case RESPONSE: - outResponse(mex, jbiMex); - break; - case FAILURE: - outFailure(mex, jbiMex); - break; - default : - __log.warn("Received ODE message exchange in unexpected state: " + mex.getStatus()); - } - } - - /** - * Forward a JBI input message to ODE. - * - * @param jbiMex - */ - private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, - NormalizedMessage request) throws Exception { - - // If this has already been tracked, we will not invoke! - if (_jbiMexTracker.track(jbiMex)) { - __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already received!"); - return; - } - - _ode.getTransactionManager().begin(); - - boolean success = false; - MyRoleMessageExchange odeMex = null; - try { - if (__log.isDebugEnabled()) { - __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint + " operation=" + jbiMex.getOperation() ); - } - odeMex = _ode._server.getEngine().createMessageExchange( - jbiMex.getExchangeId(), - _endpoint.serviceName, - jbiMex.getOperation().getLocalPart()); - - if (odeMex.getOperation() != null) { - javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage(); - Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName()); - Mapper mapper = _ode.findMapper(request,odeMex.getOperation()); - if (mapper == null) { - String errmsg = "Could not find a mapper for request message for JBI MEX " + jbiMex.getExchangeId() - + "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed. "; - __log.error(errmsg); - throw new MessageTranslationException(errmsg); - - } - odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName()); - mapper.toODE(odeRequest, request, msgdef); - odeMex.invoke(odeRequest); - - // Handle the response if it is immediately available. - if (odeMex.getStatus() != Status.ASYNC) { - __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY."); - onResponse(odeMex); - _jbiMexTracker.consume(jbiMex.getExchangeId()); + /** utility for tracking outstanding JBI message exchanges. */ + private final JbiMexTracker _jbiMexTracker = new JbiMexTracker(); + + /** JBI-Generated Endpoint */ + private ServiceEndpoint _internal; + + /** External endpoint. */ + private ServiceEndpoint _external; + + private OdeContext _ode; + + private Element _serviceref; + + private Endpoint _endpoint; + + public OdeService(OdeContext odeContext, Endpoint endpoint) throws Exception { + _ode = odeContext; + _endpoint = endpoint; + } + + /** + * Do the JBI endpoint activation. + * + * @throws JBIException + */ + public void activate() throws JBIException { + if (_serviceref == null) { + ServiceEndpoint[] candidates = _ode.getContext().getExternalEndpointsForService(_endpoint.serviceName); + if (candidates.length != 0) { + _external = candidates[0]; + } + } + _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, _endpoint.portName); + if (__log.isDebugEnabled()) { + __log.debug("Activated endpoint " + _endpoint); + } + // TODO: Is there a race situation here? + } + + /** + * Deactivate endpoints in JBI. + */ + public void deactivate() throws JBIException { + _ode.getContext().deactivateEndpoint(_internal); + __log.debug("Dectivated endpoint " + _endpoint); + } + + public ServiceEndpoint getInternalServiceEndpoint() { + return _internal; + } + + public ServiceEndpoint getExternalServiceEndpoint() { + return _external; + } + + public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) throws MessagingException { + if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) { + String errmsg = "Message exchange is not in PROVIDER role as expected: " + jbiMex.getExchangeId(); + __log.fatal(errmsg); + throw new IllegalArgumentException(errmsg); + } + + if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) { + // We can forget about the exchange. + _jbiMexTracker.consume(jbiMex.getExchangeId()); + return; + } + + if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) { + boolean success = false; + Exception err = null; + try { + invokeOde(jbiMex, ((InOnly) jbiMex).getInMessage()); + success = true; + } catch (Exception ex) { + __log.error("Error invoking ODE.", ex); + err = ex; + } finally { + if (!success) { + jbiMex.setStatus(ExchangeStatus.ERROR); + jbiMex.setError(err); + } else { + jbiMex.setStatus(ExchangeStatus.DONE); + } + } + } else if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) { + boolean success = false; + Exception err = null; + try { + invokeOde(jbiMex, ((InOut) jbiMex).getInMessage()); + success = true; + } catch (Exception ex) { + __log.error("Error invoking ODE.", ex); + err = ex; + } finally { + if (!success) { + jbiMex.setError(err); + jbiMex.setStatus(ExchangeStatus.ERROR); + } + } } else { - __log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY."); + __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + + jbiMex.getPattern()); + jbiMex.setStatus(ExchangeStatus.ERROR); + jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern())); } - } else { - __log.error("ODE MEX " +odeMex + " was unroutable."); - jbiMex.setError(new IllegalArgumentException("Unroutable invocation.")); - } - - success = true; - // For one-way invocation we do not need to maintain the association - if (odeMex.getMessageExchangePattern() != MessageExchangePattern.REQUEST_RESPONSE) - _jbiMexTracker.consume(jbiMex.getExchangeId()); - - } finally { - if (success) { - __log.debug("Commiting ODE MEX " + odeMex ); - _ode.getTransactionManager().commit(); - } else { - __log.debug("Rolling back ODE MEX " + odeMex ); - _jbiMexTracker.consume(jbiMex.getExchangeId()); - _ode.getTransactionManager().rollback(); - - } - } - - } - - private void outFailure(MyRoleMessageExchange odeMex, javax.jbi.messaging.MessageExchange jbiMex) { - try { - jbiMex.setError(new Exception("MEXFailure")); - jbiMex.setStatus(ExchangeStatus.ERROR); - // TODO: get failure codes out of the message. - } catch (MessagingException ex) { - __log.fatal("Error bridging ODE out response: ", ex); - } - } - - private void outResponse(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) { - InOut inout = (InOut)jbiMex; - - try { - NormalizedMessage nmsg = inout.createMessage(); - String mapperName = mex.getProperty(Mapper.class.getName()); - Mapper mapper = _ode.getMapper(mapperName); - if (mapper == null) { - String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " - + mex.getMessageExchangeId() + " is no longer available."; - __log.error(errmsg); - throw new MessageTranslationException(errmsg); - } - - mapper.toNMS(nmsg,mex.getResponse(), - mex.getOperation().getOutput().getMessage()); - - inout.setOutMessage(nmsg); - _ode.getChannel().send(inout); - - } catch (MessagingException ex) { - __log.error("Error bridging ODE out response: ", ex); - inout.setError(ex); - } catch (MessageTranslationException e) { - __log.error("Error translating ODE message " + mex.getResponse() - + " to NMS format!", e); - inout.setError(e); - } - } - - private void outResponseFault(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) { - - InOut inout = (InOut)jbiMex; - - try { - Fault flt = inout.createFault(); - String mapperName = mex.getProperty(Mapper.class.getName()); - Mapper mapper = _ode.getMapper(mapperName); - if (mapper == null) { - String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " - + mex.getMessageExchangeId() + " is no longer available."; - __log.error(errmsg); - throw new MessageTranslationException(errmsg); - } - - mapper.toNMS(flt,mex.getResponse(), - mex.getOperation().getOutput().getMessage()); - inout.setFault(flt); - _ode.getChannel().send(inout); - } catch (MessagingException e) { - __log.error("Error bridging ODE fault response: ", e); - inout.setError(e); - } catch (MessageTranslationException mte) { - __log.error("Error translating ODE fault message " + mex.getFaultResponse() - + " to NMS format!", mte); - inout.setError(mte); - } - } - - public Endpoint getEndpoint() { - return _endpoint; - } - - - /** - * Class for tracking outstanding message exchanges from JBI. - */ - private static class JbiMexTracker { - /** Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI MEX */ - private Map<String, javax.jbi.messaging.MessageExchange> _outstandingJbiExchanges = - new HashMap<String, javax.jbi.messaging.MessageExchange>(); - - synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) { - boolean found = _outstandingJbiExchanges.containsKey(jbiMex.getExchangeId()); - _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex); - return found; - } - - synchronized javax.jbi.messaging.MessageExchange consume(String clientId) { - return _outstandingJbiExchanges.remove(clientId); - } - - - } + + } + + /** + * Called from + * [EMAIL PROTECTED] MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)} + * + * @param mex + * message exchenge + */ + public void onResponse(MyRoleMessageExchange mex) { + javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId()); + if (jbiMex == null) { + __log.warn("Ignoring unknown async reply: " + mex); + return; + } + + switch (mex.getStatus()) { + case FAULT: + outResponseFault(mex, jbiMex); + break; + case RESPONSE: + outResponse(mex, jbiMex); + break; + case FAILURE: + outFailure(mex, jbiMex); + break; + default: + __log.warn("Received ODE message exchange in unexpected state: " + mex.getStatus()); + } + } + + /** + * Forward a JBI input message to ODE. + * + * @param jbiMex + */ + private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, NormalizedMessage request) throws Exception { + + // If this has already been tracked, we will not invoke! + if (_jbiMexTracker.track(jbiMex)) { + __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already received!"); + return; + } + + _ode.getTransactionManager().begin(); + + boolean success = false; + MyRoleMessageExchange odeMex = null; + try { + if (__log.isDebugEnabled()) { + __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint + + " operation=" + jbiMex.getOperation()); + } + odeMex = _ode._server.getEngine().createMessageExchange(jbiMex.getExchangeId(), _endpoint.serviceName, + jbiMex.getOperation().getLocalPart()); + + if (odeMex.getOperation() != null) { + copyMexProperties(odeMex, jbiMex); + javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage(); + Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName()); + Mapper mapper = _ode.findMapper(request, odeMex.getOperation()); + if (mapper == null) { + String errmsg = "Could not find a mapper for request message for JBI MEX " + jbiMex.getExchangeId() + + "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed. "; + __log.error(errmsg); + throw new MessageTranslationException(errmsg); + + } + odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName()); + mapper.toODE(odeRequest, request, msgdef); + odeMex.invoke(odeRequest); + + // Handle the response if it is immediately available. + if (odeMex.getStatus() != Status.ASYNC) { + __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY."); + onResponse(odeMex); + _jbiMexTracker.consume(jbiMex.getExchangeId()); + } else { + __log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY."); + } + } else { + __log.error("ODE MEX " + odeMex + " was unroutable."); + jbiMex.setError(new IllegalArgumentException("Unroutable invocation.")); + } + + success = true; + // For one-way invocation we do not need to maintain the association + if (odeMex.getMessageExchangePattern() != MessageExchangePattern.REQUEST_RESPONSE) + _jbiMexTracker.consume(jbiMex.getExchangeId()); + + } finally { + if (success) { + __log.debug("Commiting ODE MEX " + odeMex); + _ode.getTransactionManager().commit(); + } else { + __log.debug("Rolling back ODE MEX " + odeMex); + _jbiMexTracker.consume(jbiMex.getExchangeId()); + _ode.getTransactionManager().rollback(); + + } + } + + } + + private void outFailure(MyRoleMessageExchange odeMex, javax.jbi.messaging.MessageExchange jbiMex) { + try { + jbiMex.setError(new Exception("MEXFailure")); + jbiMex.setStatus(ExchangeStatus.ERROR); + // TODO: get failure codes out of the message. + } catch (MessagingException ex) { + __log.fatal("Error bridging ODE out response: ", ex); + } + } + + private void outResponse(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) { + InOut inout = (InOut) jbiMex; + + try { + NormalizedMessage nmsg = inout.createMessage(); + String mapperName = mex.getProperty(Mapper.class.getName()); + Mapper mapper = _ode.getMapper(mapperName); + if (mapper == null) { + String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " + mex.getMessageExchangeId() + + " is no longer available."; + __log.error(errmsg); + throw new MessageTranslationException(errmsg); + } + + mapper.toNMS(nmsg, mex.getResponse(), mex.getOperation().getOutput().getMessage()); + + inout.setOutMessage(nmsg); + _ode.getChannel().send(inout); + + } catch (MessagingException ex) { + __log.error("Error bridging ODE out response: ", ex); + inout.setError(ex); + } catch (MessageTranslationException e) { + __log.error("Error translating ODE message " + mex.getResponse() + " to NMS format!", e); + inout.setError(e); + } + } + + private void outResponseFault(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) { + + InOut inout = (InOut) jbiMex; + + try { + Fault flt = inout.createFault(); + String mapperName = mex.getProperty(Mapper.class.getName()); + Mapper mapper = _ode.getMapper(mapperName); + if (mapper == null) { + String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " + mex.getMessageExchangeId() + + " is no longer available."; + __log.error(errmsg); + throw new MessageTranslationException(errmsg); + } + + mapper.toNMS(flt, mex.getResponse(), mex.getOperation().getOutput().getMessage()); + inout.setFault(flt); + _ode.getChannel().send(inout); + } catch (MessagingException e) { + __log.error("Error bridging ODE fault response: ", e); + inout.setError(e); + } catch (MessageTranslationException mte) { + __log.error("Error translating ODE fault message " + mex.getFaultResponse() + " to NMS format!", mte); + inout.setError(mte); + } + } + + public Endpoint getEndpoint() { + return _endpoint; + } + + /** + * Class for tracking outstanding message exchanges from JBI. + */ + private static class JbiMexTracker { + /** + * Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI + * MEX + */ + private Map<String, javax.jbi.messaging.MessageExchange> _outstandingJbiExchanges = new HashMap<String, javax.jbi.messaging.MessageExchange>(); + + synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) { + boolean found = _outstandingJbiExchanges.containsKey(jbiMex.getExchangeId()); + _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex); + return found; + } + + synchronized javax.jbi.messaging.MessageExchange consume(String clientId) { + return _outstandingJbiExchanges.remove(clientId); + } + + } } Added: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java?view=auto&rev=448019 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java (added) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java Tue Sep 19 17:42:16 2006 @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.jbi; + +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; + +/** + * Base-class for classes providing JBI-ODE translation services. + * + * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m + * + */ +public class ServiceBridge { + + private static final Log __log = LogFactory.getLog(ServiceBridge.class); + + /** + * Transfer message properties from the ODE message exchange to the JBI message exchange object. + * + * @param jbiMex destination JBI message-exchange + * @param odeMex source ODE message-exchange + */ + protected void copyMexProperties(javax.jbi.messaging.MessageExchange jbiMex, MyRoleMessageExchange odeMex) { + for (String propName : odeMex.getPropertyNames()) { + String val = odeMex.getProperty(propName); + if (val != null) { + jbiMex.setProperty(propName, val); + __log.debug(jbiMex + ": set property " + propName + " = " + val); + } + } + } + + /** + * Transfer message properties from the JBI message exchange to the ODE message exchange object. + * + * @param odeMex destination ODE message-exchange + * @param jbiMex source JBI message-exchange + */ + @SuppressWarnings("unchecked") + protected void copyMexProperties(MyRoleMessageExchange odeMex, javax.jbi.messaging.MessageExchange jbiMex) { + for (String propName : (Set<String>) jbiMex.getPropertyNames()) { + if (propName.startsWith("org.apache.ode")) { + // Handle ODE-specific properties + Object val = jbiMex.getProperty(propName); + if (val != null) { + String sval = val.toString(); + odeMex.setProperty(propName, sval); + __log.debug(odeMex + ": set property " + propName + " = " + sval); + } + } else { + // Non ODE-specific properties, + // TODO: Should we copy these? + } + } + } + + +} Propchange: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java ------------------------------------------------------------------------------ svn:eol-style = native
