Author: mszefler Date: Wed Jun 27 11:35:17 2007 New Revision: 551260 URL: http://svn.apache.org/viewvc?view=rev&rev=551260 Log: refactored JBI to support new(est) BART contracts
Removed: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java?view=diff&rev=551260&r1=551259&r2=551260 ============================================================================== --- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java (original) +++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java Wed Jun 27 11:35:17 2007 @@ -19,44 +19,82 @@ package org.apache.ode.jbi; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.MessageExchangeContext; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; + /** - * Implementation of the ODE [EMAIL PROTECTED] org.apache.ode.bpel.iapi.MessageExchangeContext} - * interface. This class is used by the ODE engine to make invocation on JBI - * services provided by other engines (i.e. the BPEL engine is acting as - * client/consumer of services). + * Implementation of the ODE [EMAIL PROTECTED] org.apache.ode.bpel.iapi.MessageExchangeContext} interface. This class is used by the ODE engine + * to make invocation on JBI services provided by other engines (i.e. the BPEL engine is acting as client/consumer of services). */ public class MessageExchangeContextImpl implements MessageExchangeContext { - private static final Log __log = LogFactory - .getLog(MessageExchangeContextImpl.class); + private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class); + + private OdeContext _ode; + + /** Supported invocation styles. For now this is fixed. */ + private static final Set<InvocationStyle> __supported; + static { + HashSet<InvocationStyle> supported = new HashSet<InvocationStyle>(); + supported.add(InvocationStyle.BLOCKING); + supported.add(InvocationStyle.ASYNC); + __supported = Collections.unmodifiableSet(supported); + } + + public MessageExchangeContextImpl(OdeContext ode) { + _ode = ode; + } + + public void onAsyncReply(MyRoleMessageExchange myrolemex) throws BpelEngineException { + __log.error("Unexpected onAsyncReply notification: " + myrolemex); + // Due to JBI limitiations (i.e. we cannot recover a JBI message-exchange object) , we don't support ASYNC invocations + } + + public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException { + _ode._consumer.invokePartner(mex); + } + + public void invokePartnerBlocking(PartnerRoleMessageExchange mex) throws ContextException { + _ode._consumer.invokePartner(mex); + } - private OdeContext _ode; + public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException { + throw new ContextException("Unsupported."); + + } + + public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException { + throw new ContextException("Unsupported."); + + } + + public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException { + __log.error("Unexpected onRepliabeReply notification: " + myRoleMex); + + } + + public void cancel(PartnerRoleMessageExchange mex) throws ContextException { + // What can we do in JBI to cancel? --- not much. + + } + + public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) { + return __supported ; + } + - public MessageExchangeContextImpl(OdeContext ode) { - _ode = ode; - } - - public void onAsyncReply(MyRoleMessageExchange myrolemex) - throws BpelEngineException { - OdeService ode = _ode.getService(myrolemex.getServiceName()); - if (ode != null) - ode.onResponse(myrolemex); - else { - __log.error("No active service for message exchange: " + myrolemex); - } - } - - public void invokePartner(PartnerRoleMessageExchange mex) throws ContextException { - _ode._consumer.invokePartner(mex); - } + } Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=551260&r1=551259&r2=551260 ============================================================================== --- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original) +++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Wed Jun 27 11:35:17 2007 @@ -18,40 +18,37 @@ */ package org.apache.ode.jbi; -import org.apache.ode.bpel.iapi.ContextException; -import org.apache.ode.bpel.iapi.Message; -import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; -import org.apache.ode.bpel.iapi.Scheduler; -import org.apache.ode.bpel.iapi.MessageExchange.FailureType; -import org.apache.ode.jbi.msgmap.Mapper; -import org.apache.ode.jbi.msgmap.MessageTranslationException; - import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import javax.jbi.messaging.*; +import javax.jbi.messaging.ExchangeStatus; +import javax.jbi.messaging.Fault; +import javax.jbi.messaging.InOnly; +import javax.jbi.messaging.InOut; +import javax.jbi.messaging.MessageExchange; +import javax.jbi.messaging.MessageExchangeFactory; +import javax.jbi.messaging.MessagingException; +import javax.jbi.messaging.NormalizedMessage; import javax.jbi.servicedesc.ServiceEndpoint; import javax.xml.namespace.QName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.FailureType; +import org.apache.ode.jbi.msgmap.Mapper; +import org.apache.ode.jbi.msgmap.MessageTranslationException; /** * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE - * that is destined for other JBI providers. + * that is destined for other JBI providers. */ -abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor { +class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor { private static final Log __log = LogFactory.getLog(OdeConsumer.class); - private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout", 2 * 60 * 1000L); protected OdeContext _ode; - protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT; - - - protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String, PartnerRoleMessageExchange>(); - OdeConsumer(OdeContext ode) { _ode = ode; } @@ -100,35 +97,15 @@ NormalizedMessage nmsg = inonly.createMessage(); mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null); inonly.setInMessage(nmsg); - _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() { - public void afterCompletion(boolean success) { - if (success) { - doSendOneWay(odeMex, inonly); - } - } - public void beforeCompletion() { - } - - }); + doSendJBI(odeMex, inonly); odeMex.replyOneWayOk(); } else { final InOut inout = (InOut) jbiMex; NormalizedMessage nmsg = inout.createMessage(); mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null); inout.setInMessage(nmsg); - _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() { - public void afterCompletion(boolean success) { - if (success) { - doSendTwoWay(odeMex, inout); - } - } - - public void beforeCompletion() { - } - - }); - - odeMex.replyAsync(); + doSendJBI(odeMex, inout); + odeMex.replyAsync(inout.getExchangeId()); } } catch (MessagingException me) { String errmsg = "JBI messaging error for ODE MEX " + odeMex; @@ -142,14 +119,9 @@ } - protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly); - - protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout); - - public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException { - if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) && - !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) { + if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) + && !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) { __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern()); return; } @@ -162,60 +134,42 @@ } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) { outFailure((InOut) jbiMex); } else if (jbiMex.getStatus() == ExchangeStatus.DONE) { - _outstandingExchanges.remove(jbiMex.getExchangeId()); + ; // anything todo here? } else { __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange: " + jbiMex.getExchangeId()); } } private void outFailure(final InOut jbiMex) { - final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId()); + PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId()); if (pmex == null) { - __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId()); + __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId()); return; } - - try { - _ode._scheduler.execTransaction(new Callable<Boolean>() { - public Boolean call() throws Exception { - pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null); - return null; - } - }); - } catch (Exception ex) { - __log.error("error delivering failure: ", ex); - } - + + pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null); } private void outResponse(final InOut jbiMex) { - final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId()); - if (outstanding == null) { - __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId()); + + PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId()); + if (pmex == null) { + __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId()); return; } - - try { - _ode._scheduler.execTransaction(new Callable<Boolean>() { - @SuppressWarnings("unchecked") - public Boolean call() throws Exception { - // need to reload mex since we're in a different transaction - PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId()); - if (pmex == null) { - __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId()); - return Boolean.FALSE; - } - String mapperName = pmex.getProperty(Mapper.class.getName()); - Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName); - if (mapper == null) { - String errmsg = "Mapper not found."; - __log.error(errmsg); - pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null); - } else { - try { - Fault jbiFlt = jbiMex.getFault(); - if (jbiFlt != null) { - javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values()); + + String mapperName = pmex.getProperty(Mapper.class.getName()); + Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName); + if (mapper == null) { + String errmsg = "Mapper not found."; + __log.error(errmsg); + pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null); + } else { + try { + Fault jbiFlt = jbiMex.getFault(); + if (jbiFlt != null) { + javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex + .getOperation().getFaults().values()); if (wsdlFlt == null) { pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null); } else { @@ -231,30 +185,34 @@ + wsdlFlt.getName(), null); } } - } else { - Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName()); - mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage()); - pmex.reply(response); - } - } catch (MessageTranslationException mte) { - __log.error("Error translating message.", mte); - pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null); - } - } - return null; + } else { + Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName()); + mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage()); + pmex.reply(response); } - }); - } catch (Exception ex) { - __log.error("error delivering RESPONSE: ", ex); - + } catch (MessageTranslationException mte) { + __log.error("Error translating message.", mte); + pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null); + } } } - public void setResponseTimeout(long timeout) { - _responseTimeout = timeout; + protected void doSendJBI(final PartnerRoleMessageExchange odeMex, final MessageExchange jbiMex) { + try { + switch (odeMex.getInvocationStyle()) { + case ASYNC: + _ode.getChannel().send(jbiMex); + break; + case BLOCKING: + _ode.getChannel().sendSync(jbiMex, odeMex.getTimeout()); + break; + default: + throw new ContextException("Unsupported Invocation Style: " + odeMex.getInvocationStyle()); + } + } catch (MessagingException e) { + String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex; + __log.error(errmsg, e); + } } - public long getResponseTimeout() { - return _responseTimeout; - } } Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java?view=diff&rev=551260&r1=551259&r2=551260 ============================================================================== --- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java (original) +++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java Wed Jun 27 11:35:17 2007 @@ -84,12 +84,8 @@ _ode = OdeContext.getInstance(); _ode.setContext(context); - // Use system property to determine if DeliveryChannel.sendSync or DeliveryChannel.send is used. - if (Boolean.getBoolean("org.apache.ode.jbi.sendSynch")) - _ode._consumer = new OdeConsumerSync(_ode); - else - _ode._consumer = new OdeConsumerAsync(_ode); - + _ode._consumer = new OdeConsumer(_ode); + if (_ode.getContext().getWorkspaceRoot() != null) TempFileManager.setWorkingDirectory(new File(_ode.getContext().getWorkspaceRoot())); Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?view=diff&rev=551260&r1=551259&r2=551260 ============================================================================== --- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java (original) +++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeService.java Wed Jun 27 11:35:17 2007 @@ -18,16 +18,6 @@ */ package org.apache.ode.jbi; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.iapi.Endpoint; -import org.apache.ode.bpel.iapi.Message; -import org.apache.ode.bpel.iapi.MessageExchange.Status; -import org.apache.ode.bpel.iapi.MyRoleMessageExchange; -import org.apache.ode.jbi.msgmap.Mapper; -import org.apache.ode.jbi.msgmap.MessageTranslationException; -import org.w3c.dom.Element; - import javax.jbi.JBIException; import javax.jbi.messaging.ExchangeStatus; import javax.jbi.messaging.Fault; @@ -37,8 +27,17 @@ import javax.jbi.messaging.NormalizedMessage; import javax.jbi.servicedesc.ServiceEndpoint; import javax.xml.namespace.QName; -import java.util.HashMap; -import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.InvocationStyle; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange; +import org.apache.ode.bpel.iapi.MessageExchange.Status; +import org.apache.ode.jbi.msgmap.Mapper; +import org.apache.ode.jbi.msgmap.MessageTranslationException; +import org.w3c.dom.Element; /** * Bridge JBI (consumer) to ODE (provider). @@ -47,9 +46,6 @@ private static final Log __log = LogFactory.getLog(OdeService.class); - /** utility for tracking outstanding JBI message exchanges. */ - private final JbiMexTracker _jbiMexTracker = new JbiMexTracker(); - /** JBI-Generated Endpoint */ private ServiceEndpoint _internal; @@ -111,14 +107,12 @@ if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) { // We can forget about the exchange. - __log.debug("Consuming MEX tracker " + jbiMex.getExchangeId()); - _jbiMexTracker.consume(jbiMex.getExchangeId()); return; } if (jbiMex.getOperation() == null) { - throw new IllegalArgumentException("Null operation in JBI message exchange id=" + jbiMex.getExchangeId() - + " endpoint=" + _endpoint); + throw new IllegalArgumentException("Null operation in JBI message exchange id=" + jbiMex.getExchangeId() + " endpoint=" + + _endpoint); } if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) { @@ -154,18 +148,17 @@ __log.error("Unexpected error invoking ODE.", t); err = new RuntimeException(t); } finally { - // If we got an error that wasn't sent. + // If we got an error that wasn't sent. if (jbiMex.getStatus() == ExchangeStatus.ACTIVE && !success) { - if (err != null && jbiMex.getError() != null) { + if (err != null && jbiMex.getError() != null) { jbiMex.setError(err); } - jbiMex.setStatus(ExchangeStatus.ERROR); - _ode.getChannel().send(jbiMex); - } + jbiMex.setStatus(ExchangeStatus.ERROR); + _ode.getChannel().send(jbiMex); + } } } else { - __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " - + jbiMex.getPattern()); + __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern()); jbiMex.setStatus(ExchangeStatus.ERROR); jbiMex.setError(new Exception("Unknown message exchange pattern: " + jbiMex.getPattern())); } @@ -173,20 +166,12 @@ } /** - * Called from - * [EMAIL PROTECTED] MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)} + * Called from [EMAIL PROTECTED] MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)} * * @param mex * message exchenge */ - public void onResponse(MyRoleMessageExchange mex) { - __log.debug("Consuming MEX tracker " + mex.getClientId()); - javax.jbi.messaging.MessageExchange jbiMex = _jbiMexTracker.consume(mex.getClientId()); - if (jbiMex == null) { - __log.warn("Ignoring unknown async reply: " + mex); - return; - } - + public void onResponse(MyRoleMessageExchange mex, javax.jbi.messaging.MessageExchange jbiMex) { switch (mex.getStatus()) { case FAULT: outResponseFault(mex, jbiMex); @@ -210,70 +195,47 @@ */ private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, NormalizedMessage request) throws Exception { - // If this has already been tracked, we will not invoke! - if (_jbiMexTracker.track(jbiMex)) { - __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already received!"); - return; + MyRoleMessageExchange odeMex; + if (__log.isDebugEnabled()) { + __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint + " operation=" + + jbiMex.getOperation()); } - _ode.getTransactionManager().begin(); + odeMex = _ode._server.createMessageExchange(InvocationStyle.BLOCKING, _endpoint.serviceName, jbiMex.getOperation() + .getLocalPart(), jbiMex.getExchangeId()); - boolean success = false; - MyRoleMessageExchange odeMex = null; - try { - if (__log.isDebugEnabled()) { - __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + " endpoint=" + _endpoint - + " operation=" + jbiMex.getOperation()); - } - odeMex = _ode._server.getEngine().createMessageExchange(jbiMex.getExchangeId(), _endpoint.serviceName, - jbiMex.getOperation().getLocalPart()); - - if (odeMex.getOperation() != null) { - copyMexProperties(odeMex, jbiMex); - javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage(); - Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName()); - Mapper mapper = _ode.findMapper(request, odeMex.getOperation()); - if (mapper == null) { - String errmsg = "Could not find a mapper for request message for JBI MEX " + jbiMex.getExchangeId() - + "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed. "; - __log.error(errmsg); - throw new MessageTranslationException(errmsg); - - } - odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName()); - mapper.toODE(odeRequest, request, msgdef); - odeMex.invoke(odeRequest); - - // Handle the response if it is immediately available. - if (odeMex.getStatus() != Status.ASYNC) { - __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY."); - onResponse(odeMex); - _jbiMexTracker.consume(jbiMex.getExchangeId()); - } else { - __log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY."); - } - } else { - __log.error("ODE MEX " + odeMex + " was unroutable."); - sendError(jbiMex, new IllegalArgumentException("Unroutable invocation.")); - } + if (odeMex.getOperation() == null) { + __log.error("ODE MEX " + odeMex + " was unroutable."); + sendError(jbiMex, new IllegalArgumentException("Unroutable invocation.")); + return; + } - success = true; - // For one-way invocation we do not need to maintain the association - if (jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) { - __log.debug("Consuming non Req/Res MEX tracker " + jbiMex.getExchangeId() + " with pattern " + jbiMex.getPattern()); - _jbiMexTracker.consume(jbiMex.getExchangeId()); - } + copyMexProperties(odeMex, jbiMex); + javax.wsdl.Message msgdef = odeMex.getOperation().getInput().getMessage(); + Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName()); + Mapper mapper = _ode.findMapper(request, odeMex.getOperation()); + if (mapper == null) { + String errmsg = "Could not find a mapper for request message for JBI MEX " + jbiMex.getExchangeId() + "; ODE MEX " + + odeMex.getMessageExchangeId() + " is failed. "; + __log.error(errmsg); + throw new MessageTranslationException(errmsg); - } finally { - if (success) { - __log.debug("Commiting ODE MEX " + odeMex); - _ode.getTransactionManager().commit(); + } + odeMex.setProperty(Mapper.class.getName(), mapper.getClass().getName()); + mapper.toODE(odeRequest, request, msgdef); + odeMex.setRequest(odeRequest); + try { + odeMex.invokeBlocking(); + // Handle the response if it is immediately available. + if (odeMex.getStatus() != Status.ASYNC) { + __log.debug("ODE MEX " + odeMex + " completed SYNCHRONOUSLY."); + onResponse(odeMex, jbiMex); } else { - __log.debug("Rolling back ODE MEX " + odeMex); - _jbiMexTracker.consume(jbiMex.getExchangeId()); - _ode.getTransactionManager().rollback(); - + __log.fatal("ODE MEX " + odeMex + " unexpectedly completed ASYNCHRONOUSLY."); } + } catch (Exception ex) { + __log.error("ODE MEX " + odeMex + " resulted in an error."); + sendError(jbiMex, ex); } } @@ -335,7 +297,7 @@ QName fault = mex.getFault(); javax.wsdl.Fault wsdlFault = mex.getOperation().getFault(fault.getLocalPart()); if (wsdlFault == null) { - sendError(jbiMex, new MessageTranslationException("Unmapped Fault : " + fault + ": " + mex.getFaultExplanation())); + sendError(jbiMex, new MessageTranslationException("Unmapped Fault : " + fault + ": " + mex.getFaultExplanation())); } else { mapper.toNMS(flt, mex.getFaultResponse(), wsdlFault.getMessage(), fault); inout.setFault(flt); @@ -349,7 +311,7 @@ sendError(jbiMex, mte); } } - + private void sendError(javax.jbi.messaging.MessageExchange jbiMex, Exception error) { try { jbiMex.setError(error); @@ -364,25 +326,4 @@ return _endpoint; } - /** - * Class for tracking outstanding message exchanges from JBI. - */ - private static class JbiMexTracker { - /** - * Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI - * MEX - */ - private Map<String, javax.jbi.messaging.MessageExchange> _outstandingJbiExchanges = new HashMap<String, javax.jbi.messaging.MessageExchange>(); - - synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) { - boolean found = _outstandingJbiExchanges.containsKey(jbiMex.getExchangeId()); - _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex); - return found; - } - - synchronized javax.jbi.messaging.MessageExchange consume(String clientId) { - return _outstandingJbiExchanges.remove(clientId); - } - - } }