Author: mszefler Date: Mon Jun 25 17:58:15 2007 New Revision: 550650 URL: http://svn.apache.org/viewvc?view=rev&rev=550650 Log: BART
Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java - copied, changed from r550560, incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Removed: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java Mon Jun 25 17:58:15 2007 @@ -19,7 +19,6 @@ package org.apache.ode.bpel.iapi; -import javax.xml.namespace.QName; /** * Interface exposing the BPEL "engine". Basically, this interface facilitates @@ -34,26 +33,5 @@ */ public interface BpelEngine extends Scheduler.JobProcessor { - /** - * Create a "my role" message exchange for invoking a BPEL process. - * - * @param serviceId - * the service id of the process being called, if known - * @param operation - * name of the operation - * - * @return [EMAIL PROTECTED] MyRoleMessageExchange} the newly created message exchange - */ - MyRoleMessageExchange createMessageExchange(String clientKey, QName serviceId, String operation) - throws BpelEngineException; - - /** - * Retrieve a message identified by the given identifer. - * - * @param mexId - * message exhcange identifier - * @return associated message exchange - */ - MessageExchange getMessageExchange(String mexId); - + } Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java Mon Jun 25 17:58:15 2007 @@ -18,12 +18,13 @@ */ package org.apache.ode.bpel.iapi; +import java.util.Set; + import javax.xml.namespace.QName; /** - * Interface implemented by the BPEL server. Provides methods for - * life-cycle management. + * Interface implemented by the BPEL server. Provides methods for life-cycle management and process invocation. * * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m */ @@ -116,5 +117,39 @@ * @throws BpelEngineException */ void unregister(QName pid) throws BpelEngineException; + + + /** + * Inquire of the engine the invocation styles that are supported for a given service. + * @param serviceId service identifier + * @return set of supported [EMAIL PROTECTED] InvocationStyle}s + */ + Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId); + + /** + * Create a "my role" message exchange for invoking a BPEL process. + * + * @param serviceId + * the service id of the process being called, if known + * @param operation + * name of the operation + * + * @return [EMAIL PROTECTED] MyRoleMessageExchange} the newly created message exchange + */ + MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, + QName serviceId, + String operation, + String clientKey) + throws BpelEngineException; + + /** + * Retrieve a message identified by the given identifer. + * + * @param mexId + * message exhcange identifier + * @return associated message exchange + */ + MessageExchange getMessageExchange(String mexId) + throws BpelEngineException; } Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java (original) +++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java Mon Jun 25 17:58:15 2007 @@ -56,6 +56,8 @@ */ CorrelationStatus getCorrelationStatus(); + void setRequest(Message request); + /** * "Invoke" a process hosted by the BPEL engine. The state of the invocation * may be obtained by a call to the [EMAIL PROTECTED] MessageExchange#getStatus()} @@ -68,20 +70,19 @@ * [EMAIL PROTECTED] MessageExchangeContext#onAsyncReply(MyRoleMessageExchange)} when * the response become available. */ - Future<MessageExchange.Status> invoke(Message request); + void invokeBlocking(); + void invokeReliable(); + + void invokeAsync(); + + void invokeTransacted(); + /** * Complete the message, exchange: indicates that the client has receive the * response (if any). */ void complete(); - - /** - * Associate a client key with this message exchange. - * - * @param clientKey - */ - void setClientId(String clientKey); /** * Get the previously associated client key for this exchange. Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Mon Jun 25 17:58:15 2007 @@ -29,6 +29,7 @@ import org.apache.ode.bpel.iapi.BpelEngineException; import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Endpoint; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; @@ -106,10 +107,13 @@ _contexts = contexts; } - public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, - String operation, String pipedMexId) + MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName targetService, String operation, String clientKey) throws BpelEngineException { + // TODO: for now, invocation of the engine is only supported in RELIABLE mode. + if (istyle != InvocationStyle.RELIABLE) + throw new BpelEngineException("Unsupported InvocationStyle: " + istyle); + BpelProcess target = route(targetService, null); MessageExchangeDAO dao; @@ -125,7 +129,7 @@ dao.setStatus(Status.NEW.toString()); dao.setOperation(operation); dao.setPipedMessageExchangeId(pipedMexId); - MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, dao); + ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, dao); if (target != null) { target.initMyRoleMex(mex); @@ -134,11 +138,7 @@ return mex; } - public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) { - return createMessageExchange(clientKey, targetService, operation, null); - } - - public MessageExchange getMessageExchange(String mexId) throws BpelEngineException { + MessageExchange getMessageExchange(String mexId) throws BpelEngineException { MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId); if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId); if (mexdao == null) @@ -167,7 +167,7 @@ } break; case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE: - mex = new MyRoleMessageExchangeImpl(this, mexdao); + mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao); if (process != null) { OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId()); PortType ptype = plink.myRolePortType; Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Jun 25 17:58:15 2007 @@ -148,7 +148,7 @@ * * @param mex */ - void invokeProcess(MyRoleMessageExchangeImpl mex) { + void invokeProcess(ReliableMyRoleMessageExchangeImpl mex) { _hydrationLatch.latch(1); try { PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName()); @@ -179,7 +179,7 @@ } } - private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) { + private MessageExchangeDAO getDAO(ReliableMyRoleMessageExchangeImpl mex) { } @@ -191,7 +191,7 @@ return null; } - void initMyRoleMex(MyRoleMessageExchangeImpl mex) { + void initMyRoleMex(ReliableMyRoleMessageExchangeImpl mex) { markused(); PartnerLinkMyRoleImpl target = null; for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) { @@ -274,7 +274,7 @@ * message exchange * @return <code>true</code> if execution should continue, <code>false</code> otherwise */ - boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { + boolean processInterceptors(ReliableMyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf); for (MessageExchangeInterceptor i : _mexInterceptors) @@ -307,7 +307,7 @@ if (__log.isDebugEnabled()) { __log.debug("InvokeInternal event for mexid " + we.getMexId()); } - MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId()); + ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId()); invokeProcess(mex); } else { // Instance level events @@ -612,7 +612,7 @@ /** Create a version-appropriate runtime context. */ BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template, - MyRoleMessageExchangeImpl instantiatingMessageExchange) { + ReliableMyRoleMessageExchangeImpl instantiatingMessageExchange) { return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange); } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Jun 25 17:58:15 2007 @@ -591,7 +591,7 @@ } else /* IL-mediated communication */ { // TODO: distinguish between different kinds of my-role mexss - MyRoleMessageExchangeImpl myRoleMex = new MyRoleMessageExchangeImpl(); + ReliableMyRoleMessageExchangeImpl myRoleMex = new ReliableMyRoleMessageExchangeImpl(); _bpelProcess._engine._contexts.mexContext.onAsyncReply(myRoleMex); } @@ -1121,7 +1121,7 @@ for (String mexId : mexRefs) { MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId); if (mexDao != null) { - MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); + ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); switch (mex.getStatus()) { case ASYNC: case RESPONSE: @@ -1146,7 +1146,7 @@ for (String mexId : mexRefs) { MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId); if (mexDao != null) { - MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); + ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); _bpelProcess.initMyRoleMex(mex); Message message = mex.createMessage(faultData.getFaultName()); @@ -1165,7 +1165,7 @@ String[] mexRefs = _outstandingRequests.releaseAll(); for (String mexId : mexRefs) { MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId); - MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); + ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); _bpelProcess.initMyRoleMex(mex); mex.setFailure(FailureType.OTHER, "No response.", null); _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex); Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Mon Jun 25 17:58:15 2007 @@ -42,7 +42,7 @@ /** * Base implementation of the [EMAIL PROTECTED] MessageExchange} interface. This interfaces is exposed to the Integration Layer (IL) - * to allow it to implement incoming (via [EMAIL PROTECTED] MyRoleMessageExchangeImpl}) and outgoing (via [EMAIL PROTECTED] PartnerRoleMessageExchangeImpl}) + * to allow it to implement incoming (via [EMAIL PROTECTED] ReliableMyRoleMessageExchangeImpl}) and outgoing (via [EMAIL PROTECTED] PartnerRoleMessageExchangeImpl}) * communications. * * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It is imperative that the integration layer @@ -236,9 +236,6 @@ return _portType; } - QName getServiceName() { - return _callee; - } public Message getRequest() { if (_request != null) return _request; @@ -379,9 +376,18 @@ public String toString() { return "MEX[" + _mexId + "]"; } + + protected void assertTransaction() { + if (!_contexts.scheduler.isTransacted()) + throw new BpelEngineException("Operation must be performed in a transaction!"); + } - protected <T> T doInDb(InDbAction<T> name) { - throw new UnsupportedOperationException(); + protected <T> T doInDb(InDbAction<T> action) { + if (_txflag) { + MessageExchangeDAO mexDao; + action.call(mexDao); + } else { + } } interface InDbAction<T> { Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=550650&r1=550649&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Mon Jun 25 17:58:15 2007 @@ -83,7 +83,7 @@ * @param mex * exchange to which the message is related */ - public void invokeMyRole(MyRoleMessageExchangeImpl mex) { + public void invokeMyRole(ReliableMyRoleMessageExchangeImpl mex) { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex })); @@ -266,7 +266,7 @@ return op; } - private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) { + private CorrelationKey[] computeCorrelationKeys(ReliableMyRoleMessageExchangeImpl mex) { Operation operation = mex.getOperation(); Element msg = mex.getRequest().getMessage(); javax.wsdl.Message msgDescription = operation.getInput().getMessage(); Copied: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (from r550560, incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java) URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=550650&p1=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java&r1=550560&p2=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java&r2=550650 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Mon Jun 25 17:58:15 2007 @@ -21,12 +21,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.dao.MessageDAO; import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.Message; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus; import org.apache.ode.bpel.intercept.AbortMessageExchangeException; import org.apache.ode.bpel.intercept.FaultMessageExchangeException; import org.apache.ode.bpel.intercept.InterceptorInvoker; @@ -41,25 +44,49 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange { +/** + * Provides an implementation of the [EMAIL PROTECTED] MyRoleMessageExchange} inteface for interactions performed in the + * [EMAIL PROTECTED] InvocationStyle#RELIABLE} style. + * + * @author Maciej Szefler + */ +class ReliableMyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange { + + private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class); - private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class); public static final int TIMEOUT = 2 * 60 * 1000; - private static Map<String, ResponseFuture> _waitingFutures = - new ConcurrentHashMap<String, ResponseFuture>(); + private static Map<String, ResponseFuture> _waitingFutures = new ConcurrentHashMap<String, ResponseFuture>(); + + private CorrelationStatus _cstatus; + + private String _clientId; + public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + super(engine, mexId); - public MyRoleMessageExchangeImpl() { - super(engine, mexdao); + // RELIABLE means we are bound to a transaction + _txflag = true; } public CorrelationStatus getCorrelationStatus() { - return CorrelationStatus.valueOf(getDAO().getCorrelationStatus()); + return _cstatus; } - void setCorrelationStatus(CorrelationStatus status) { - getDAO().setCorrelationStatus(status.toString()); + @Override + void load(MessageExchangeDAO dao) { + super.load(dao); + if (_cstatus == null) + _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus()); + if (_clientId == null) + _clientId = dao.getCorrelationId(); + } + + @Override + public void save(MessageExchangeDAO dao) { + super.save(dao); + dao.setCorrelationStatus(_cstatus.toString()); + dao.setCorrelationId(_clientId); } /** @@ -67,21 +94,19 @@ * * @param mex * message exchange - * @return <code>true</code> if execution should continue, - * <code>false</code> otherwise + * @return <code>true</code> if execution should continue, <code>false</code> otherwise */ - private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { - InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), - mex._dao.getProcess(), null); + private boolean processInterceptors(InterceptorInvoker invoker, MessageExchangeDAO mexDao) { + InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), mexDao.getProcess(), null); for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors()) - if (!processInterceptor(i, mex, ictx, invoker)) + if (!processInterceptor(i, this, ictx, invoker)) return false; return true; } - boolean processInterceptor(MessageExchangeInterceptor i, MyRoleMessageExchangeImpl mex, InterceptorContext ictx, + boolean processInterceptor(MessageExchangeInterceptor i, ReliableMyRoleMessageExchangeImpl mex, InterceptorContext ictx, InterceptorInvoker invoker) { __log.debug(invoker + "--> interceptor " + i); try { @@ -99,40 +124,54 @@ return true; } - public Future<MessageExchange.Status> invoke(Message request) { + public Future<MessageExchange.Status> invoke(final Message request) { if (request == null) { String errmsg = "Must pass non-null message to invoke()!"; - __log.fatal(errmsg); throw new NullPointerException(errmsg); } - _dao.setRequest(((MessageImpl) request)._dao); - setStatus(MessageExchange.Status.REQUEST); - - if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked)) { - throw new BpelEngineException("Intercepted."); - } - - BpelProcess target = _engine.route(getDAO().getCallee(), request); - - if (__log.isDebugEnabled()) - __log.debug("invoke() EPR= " + _epr + " ==> " + target); + // For reliable, we MUST HAVE A TRANSACTION! + assertTransaction(); - - ResponseFuture future = new ResponseFuture(); - + BpelProcess target = _engine.route(_callee, request); if (target == null) { if (__log.isWarnEnabled()) __log.warn(__msgs.msgUnknownEPR("" + _epr)); - setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT); + ResponseFuture future = new ResponseFuture(); + + _cstatus = MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT; setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, null); - future.done(_lastStatus); - } else { + future.done(_status); + + return future; + } + + doInDb(new InDbAction<Void>() { + + public Void call(MessageExchangeDAO mexdao) { + // TODO: perhaps we should check if already backed by DB? + MessageDAO msgDao = mexdao.createMessage(request.getType()); + msgDao.setData(request.getMessage()); + setStatus(MessageExchange.Status.REQUEST); + + if (!processInterceptors(this, InterceptorInvoker.__onBpelServerInvoked)) { + throw new BpelEngineException("Intercepted."); + } + + if (__log.isDebugEnabled()) + __log.debug("invoke() EPR= " + _epr + " ==> " + target); + + } + + }); + + { // Schedule a new job for invocation WorkEvent we = new WorkEvent(); we.setType(WorkEvent.Type.INVOKE_INTERNAL); - if (target.isInMemory()) we.setInMem(true); + if (target.isInMemory()) + we.setInMem(true); we.setProcessId(target.getPID()); we.setMexId(getDAO().getMessageExchangeId()); @@ -144,7 +183,6 @@ future.done(_lastStatus); } - if (target.isInMemory()) _engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail()); else @@ -158,16 +196,8 @@ public void complete() { } - public QName getServiceName() { - return getDAO().getCallee(); - } - - public void setClientId(String clientKey) { - getDAO().setCorrelationId(clientKey); - } - public String getClientId() { - return getDAO().getCorrelationId(); + return _clientId; } public String toString() { @@ -179,11 +209,6 @@ } } - public boolean isAsynchronous() { - return true; - } - - protected void responseReceived() { final String mexid = getMessageExchangeId(); _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() { @@ -192,18 +217,19 @@ ResponseFuture callback = _waitingFutures.remove(mexid); callback.done(_lastStatus); } + public void beforeCompletion() { } }); } - + private static class ResponseFuture implements Future<Status> { private Status _status; public boolean cancel(boolean mayInterruptIfRunning) { return false; } - + public Status get() throws InterruptedException, ExecutionException { try { return get(0, TimeUnit.MILLISECONDS); @@ -212,22 +238,20 @@ throw new RuntimeException(e); } } - - public Status get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - - - synchronized(this) { + + public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + synchronized (this) { if (_status != null) return _status; - + while (_status == null) { this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); } - + if (_status == null) throw new TimeoutException(); - + return _status; } } @@ -235,13 +259,13 @@ public boolean isCancelled() { return false; } - + public boolean isDone() { return _status != null; } - + void done(Status status) { - synchronized(this) { + synchronized (this) { _status = status; this.notifyAll(); }