Author: mszefler Date: Tue Jun 26 16:21:01 2007 New Revision: 550983 URL: http://svn.apache.org/viewvc?view=rev&rev=550983 Log: Implement MyRole MessageExchange objects.
Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (with props) incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (with props) incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (with props) incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (with props) incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (with props) Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=auto&rev=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (added) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -0,0 +1,105 @@ +package org.apache.ode.bpel.engine; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.MessageExchange.Status; + +/** + * For invoking the engine using ASYNC style. + * + * @author Maciej Szefler + * + */ +public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl { + private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class); + + private static Map<String, ResponseFuture> _waitingFutures = new ConcurrentHashMap<String, ResponseFuture>(); + + public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + super(engine, mexId); + } + + public Future<Status> invokeAsync() { + ResponseFuture future = new ResponseFuture(); + + BpelProcess target = _engine.route(_callee, _request); + if (target == null) { + if (__log.isWarnEnabled()) + __log.warn(__msgs.msgUnknownEPR("" + _epr)); + + _cstatus = CorrelationStatus.UKNOWN_ENDPOINT; + setFailure(FailureType.UNKNOWN_ENDPOINT, null, null); + future.done(_status); + + return future; + } + + scheduleInvoke(target); + + if (getOperation().getOutput() != null) { + _waitingFutures.put(getMessageExchangeId(), future); + } else { + future.done(getStatus()); + } + + return future; + + } + + private static class ResponseFuture implements Future<Status> { + private Status _status; + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + public Status get() throws InterruptedException, ExecutionException { + try { + return get(0, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // If it's thrown it's definitely a bug + throw new RuntimeException(e); + } + } + + public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + synchronized (this) { + if (_status != null) + return _status; + + while (_status == null) { + this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); + } + + if (_status == null) + throw new TimeoutException(); + + return _status; + } + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return _status != null; + } + + void done(Status status) { + synchronized (this) { + _status = status; + this.notifyAll(); + } + } + } + +} Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=auto&rev=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (added) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -0,0 +1,100 @@ +package org.apache.ode.bpel.engine; + +import javax.wsdl.Operation; +import javax.wsdl.PortType; +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.InvocationStyle; +import org.apache.ode.bpel.iapi.Message; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; +import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.w3c.dom.Element; + +/** + * Implementation of the [EMAIL PROTECTED] PartnerRoleMessageExchange} interface that is used when the ASYNC invocation + * style is being used (see [EMAIL PROTECTED] InvocationStyle#ASYNC}). The basic idea here is that with this style, the + * IL does not get the "message" (i.e. this object) until the ODE transaction has committed, and it does not + * block during the performance of the operation. Hence, when a reply becomes available, we'll need to + * schedule a transaction to process it. + * + * @author Maciej Szefler + * + */ +public class AsyncPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { + + private static final Log __log = LogFactory.getLog(AsyncPartnerRoleMessageExchangeImpl.class); + + AsyncPartnerRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId, PortType portType, Operation operation, boolean inMem, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { + super(engine, mexId, portType, operation, inMem, epr, myRoleEPR, channel); + } + + public void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException { + if(!isAsync()) + throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC state!"); + + super.replyWithFault(faultType,outputFaultMessage); + scheduleContinuation(); + } + + public void reply(Message response) throws BpelEngineException { + if(!isAsync()) + throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC state!"); + + super.reply(response); + scheduleContinuation(); + + } + + public void replyWithFailure(FailureType type, String description, Element details) throws BpelEngineException { + if(!isAsync()) + throw new BpelEngineException("Invalid action, message-exchange is not in ASYNC state!"); + super.replyWithFailure(type, description, details); + scheduleContinuation(); + } + + + /** + * Check if we are in the ASYNC state. + * + * @return + */ + private boolean isAsync() { + return getStatus() == Status.ASYNC; + } + + + /** + * Continue from the ASYNC state by scheduling a continuation to process a response/fault/failure. + */ + private void scheduleContinuation() { + // If there is no channel waiting for us, there is nothing to do. + if (getPartnerRoleChannel() == null) { + if (__log.isDebugEnabled()) { + __log.debug("no channel on mex=" + getMessageExchangeId()); + } + return; + } + + + WorkEvent we = new WorkEvent(); + we.setIID(_iid); + we.setType(WorkEvent.Type.INVOKE_RESPONSE); + we.setInMem(_inMem); + we.setChannel(_responseChannel); + we.setMexId(_mexId); + + if (__log.isDebugEnabled()) { + __log.debug("scheduleContinuation: scheduling WorkEvent " + we); + } + + if (_inMem) + _contexts.scheduler.scheduleVolatileJob(true, we.getDetail()); + else + _contexts.scheduler.schedulePersistedJob(we.getDetail(), null); + } + +} Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=auto&rev=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (added) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -0,0 +1,34 @@ +package org.apache.ode.bpel.engine; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import org.apache.ode.bpel.iapi.BpelEngineException; +import org.apache.ode.bpel.iapi.MessageExchange.Status; + +public class BlockingMyRoleMessageExchangeImpl extends AsyncMyRoleMessageExchangeImpl { + Future<Status> _future; + boolean _done = false; + + public BlockingMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + super(engine, mexId); + } + + @Override + public Future<Status> invokeAsync() { + throw new BpelEngineException("Invalid invocation style, use invokeBlocking() instead."); + } + + @Override + public Status invokeBlocking() throws BpelEngineException, TimeoutException { + if (_done) + return _status; + if (_future != null) + _future.get(); + Future<Status> future = super.invokeAsync(); + + future.get(timeout, unit) + } + + +} Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java?view=auto&rev=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (added) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -0,0 +1,27 @@ +package org.apache.ode.bpel.engine; + +import javax.wsdl.Operation; +import javax.wsdl.PortType; + +import org.apache.ode.bpel.iapi.EndpointReference; +import org.apache.ode.bpel.iapi.MessageExchangeContext; +import org.apache.ode.bpel.iapi.PartnerRoleChannel; + +/** + * Implementation of the [EMAIL PROTECTED] PartnerRoleMessageExchange} interface that is passed to the IL when the + * BLOCKING invocation style is used (see [EMAIL PROTECTED] InvocationStyle#BLOCKING}). The basic idea here is that + * with this style, the IL performs the operation while blocking in the + * [EMAIL PROTECTED] MessageExchangeContext#invokePartner(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)} method. + * + * This InvocationStyle makes this class rather trivial. + * + * @author Maciej Szefler + * + */ +public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl { + + BlockingPartnerRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId, PortType portType, Operation operation, boolean inMem, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) { + super(engine, mexId, portType, operation, inMem, epr, myRoleEPR, channel); + } + +} Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=550983&r1=550982&r2=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Jun 26 16:21:01 2007 @@ -54,12 +54,13 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * Implementation of the [EMAIL PROTECTED] BpelEngine} interface: provides the server methods that should be invoked in the context of a * transaction. - * + * * @author mszefler * @author Matthieu Riou <mriou at apache dot org> */ @@ -107,40 +108,89 @@ _contexts = contexts; } - MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName targetService, String operation, String clientKey) - throws BpelEngineException { + MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService, final String operation, + final String clientKey) throws BpelEngineException { - // TODO: for now, invocation of the engine is only supported in RELIABLE mode. - if (istyle != InvocationStyle.RELIABLE) - throw new BpelEngineException("Unsupported InvocationStyle: " + istyle); - - BpelProcess target = route(targetService, null); - - MessageExchangeDAO dao; - if (target == null || target.isInMemory()) { - dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); - } else { - dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); - } - dao.setCorrelationId(clientKey); - dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString()); - dao.setPattern(MessageExchangePattern.UNKNOWN.toString()); - dao.setCallee(targetService); - dao.setStatus(Status.NEW.toString()); - dao.setOperation(operation); - dao.setPipedMessageExchangeId(pipedMexId); - ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, dao); + final BpelProcess target = route(targetService, null); + + if (target == null) + throw new BpelEngineException("NoSuchService: " + targetService); + + Callable<String> createDao = new Callable<String>() { + + public String call() throws Exception { + MessageExchangeDAO dao; + if (target.isInMemory()) { + dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); + } else { + dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); + } + dao.setInvocationStyle(istyle.toString()); + dao.setCorrelationId(clientKey); + dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString()); + dao.setPattern(MessageExchangePattern.UNKNOWN.toString()); + dao.setCallee(targetService); + dao.setStatus(Status.NEW.toString()); + dao.setOperation(operation); + return dao.getMessageExchangeId(); + } + + }; - if (target != null) { - target.initMyRoleMex(mex); + MyRoleMessageExchangeImpl mex; + String mexId; + switch (istyle) { + case ASYNC: + try { + mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get(); + } catch (Exception e) { + __log.error("Internal Error: could not execute isolated transaction.", e); + throw new BpelEngineException("Internal Error", e); + } + mex = new AsyncMyRoleMessageExchangeImpl(this, mexId); + break; + case BLOCKING: + try { + mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get(); + } catch (Exception e) { + __log.error("Internal Error: could not execute isolated transaction.", e); + throw new BpelEngineException("Internal Error", e); + } + mex = new BlockingMyRoleMessageExchangeImpl(this, mexId); + break; + + case RELIABLE: + assertTransaction(); + try { + mexId = createDao.call(); + } catch (Exception e) { + __log.error("Internal Error: could not execute DB calls.", e); + throw new BpelEngineException("Internal Error", e); + } + mex = new ReliableMyRoleMessageExchangeImpl(this, mexId); + break; + case TRANSACTED: + assertTransaction(); + try { + mexId = createDao.call(); + } catch (Exception e) { + __log.error("Internal Error: could not execute DB calls.", e); + throw new BpelEngineException("Internal Error", e); + } + mex = new TransactedMyRoleMessageExchangeImpl(this, mexId); + default: + throw new Error("Internal Error: unknown InvocationStyle: " + istyle); } + target.initMyRoleMex(mex); + return mex; } MessageExchange getMessageExchange(String mexId) throws BpelEngineException { MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId); - if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId); + if (mexdao == null) + mexdao = _contexts.dao.getConnection().getMessageExchange(mexId); if (mexdao == null) return null; @@ -203,7 +253,9 @@ /** * Register a process with the engine. - * @param process the process to register + * + * @param process + * the process to register */ void registerProcess(BpelProcess process) { _activeProcesses.put(process.getPID(), process); @@ -217,7 +269,7 @@ /** * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes * would not be registered under the same service qname but different endpoint. - * + * * @param service * target service id * @param request @@ -260,7 +312,9 @@ public void afterCompletion(boolean success) { _instanceLockManager.unlock(we.getIID()); } - public void beforeCompletion() { } + + public void beforeCompletion() { + } }); } catch (InterruptedException e) { // Retry later. @@ -284,8 +338,10 @@ process = _activeProcesses.get(we.getProcessId()); } else { ProcessInstanceDAO instance; - if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID()); - else instance = _contexts.dao.getConnection().getInstance(we.getIID()); + if (we.isInMem()) + instance = _contexts.inMemDao.getConnection().getInstance(we.getIID()); + else + instance = _contexts.dao.getConnection().getInstance(we.getIID()); if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); @@ -344,13 +400,13 @@ _contexts.scheduler.execIsolatedTransaction(new Callable<Void>() { public Void call() throws Exception { jobInfo.jobDetail.put("final", true); - _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, - new Date(System.currentTimeMillis() + 60 * 1000)); + _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date( + System.currentTimeMillis() + 60 * 1000)); return null; } }); } catch (Exception ex) { - __log.error("Error rescheduling problematic job: " + jobInfo,ex); + __log.error("Error rescheduling problematic job: " + jobInfo, ex); saveToDisk = true; } } else { @@ -363,12 +419,11 @@ ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f)); fos.writeObject(jobInfo); fos.close(); - __log.error("Saved problematic job to disk (last resort): " + jobInfo +" in file " + f); + __log.error("Saved problematic job to disk (last resort): " + jobInfo + " in file " + f); } catch (Exception ex) { __log.error("Could not save bad job; it will be lost: " + jobInfo, ex); } - // No more retries. return false; } @@ -408,11 +463,16 @@ /** * Get the list of globally-registered message-exchange interceptors. - * + * * @return list */ List<MessageExchangeInterceptor> getGlobalInterceptors() { return _contexts.globalIntereceptors; + } + + protected void assertTransaction() { + if (!_contexts.scheduler.isTransacted()) + throw new BpelEngineException("Operation must be performed in a transaction!"); } } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550983&r1=550982&r2=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jun 26 16:21:01 2007 @@ -191,7 +191,7 @@ return null; } - void initMyRoleMex(ReliableMyRoleMessageExchangeImpl mex) { + void initMyRoleMex(MyRoleMessageExchangeImpl mex) { markused(); PartnerLinkMyRoleImpl target = null; for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) { Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550983&r1=550982&r2=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Callable; import javax.wsdl.Operation; import javax.wsdl.PortType; @@ -41,15 +42,15 @@ import org.w3c.dom.Element; /** - * Base implementation of the [EMAIL PROTECTED] MessageExchange} interface. This interfaces is exposed to the Integration Layer (IL) - * to allow it to implement incoming (via [EMAIL PROTECTED] ReliableMyRoleMessageExchangeImpl}) and outgoing (via [EMAIL PROTECTED] PartnerRoleMessageExchangeImpl}) - * communications. + * Base implementation of the [EMAIL PROTECTED] MessageExchange} interface. This interfaces is exposed to the Integration Layer (IL) to allow + * it to implement incoming (via [EMAIL PROTECTED] ReliableMyRoleMessageExchangeImpl}) and outgoing (via + * [EMAIL PROTECTED] PartnerRoleMessageExchangeImpl}) communications. * * It should be noted that this class and its derived classes are in NO WAY THREADSAFE. It is imperative that the integration layer - * not attempt to use [EMAIL PROTECTED] MessageExchange} objects from multiple threads. + * not attempt to use [EMAIL PROTECTED] MessageExchange} objects from multiple threads. * * @author Maciej Szefler - * + * */ abstract class MessageExchangeImpl implements MessageExchange { @@ -92,49 +93,44 @@ Contexts _contexts; - QName _callee; - BpelEngineImpl _engine; boolean _associated; - + InvocationStyle _istyle; - + /** The point at which this message-exchange will time out. */ Date _timeout; - - enum Change { - EPR, - RESPONSE, - RELEASE + + enum Change { + EPR, RESPONSE, RELEASE, REQUEST } final HashSet<Change> _changes = new HashSet<Change>(); - + /** Properties that have been retrieved from the database. */ - final HashMap<String, String> _properties = new HashMap<String,String>(); - + final HashMap<String, String> _properties = new HashMap<String, String>(); + /** Names of properties that have been retrieved from the database. */ final HashSet<String> _loadedProperties = new HashSet<String>(); /** Names of proprties that have been modified. */ final HashSet<String> _modifiedProperties = new HashSet<String>(); - + private FailureType _failureType; private Set<String> _propNames; - + public MessageExchangeImpl(BpelEngineImpl engine, String mexId) { _contexts = engine._contexts; _engine = engine; _mexId = mexId; } - void load(MessageExchangeDAO dao) { if (dao.getMessageExchangeId().equals(_mexId)) throw new IllegalArgumentException("MessageExchangeId mismatch!"); - + if (_pattern == null) _pattern = MessageExchangePattern.valueOf(dao.getPattern()); if (_opname == null) @@ -149,37 +145,44 @@ _explanation = dao.getFaultExplanation(); if (_status == null) _status = Status.valueOf(dao.getStatus()); - if (_callee == null) - _callee = dao.getCallee(); if (_istyle == null) _istyle = InvocationStyle.valueOf(dao.getInvocationStyle()); } - + public void save(MessageExchangeDAO dao) { dao.setStatus(_status.toString()); dao.setInvocationStyle(_istyle.toString()); dao.setFault(_fault); dao.setFaultExplanation(_explanation); - //todo: set failureType - + // todo: set failureType + if (_changes.contains(Change.RESPONSE)) { MessageDAO responseDao = dao.createMessage(_response.getType()); responseDao.setData(_response.getMessage()); } - + if (_changes.contains(Change.EPR)) { if (_epr != null) dao.setEPR(_epr.toXML().getDocumentElement()); else dao.setEPR(null); } - + for (String modprop : _modifiedProperties) { dao.setProperty(modprop, _properties.get(modprop)); } } - + + void save() { + doInTX(new InDbAction<Void>() { + public Void call(MessageExchangeDAO mexdao) { + save(mexdao); + return null; + } + }); + } + public InvocationStyle getInvocationStyle() { return _istyle; } @@ -240,12 +243,12 @@ if (_request != null) return _request; - return _request = doInDb(new InDbAction<MessageImpl>() { + return _request = doInTX(new InDbAction<MessageImpl>() { public MessageImpl call(MessageExchangeDAO dao) { MessageDAO req = dao.getRequest(); if (req == null) return null; - return new MemBackedMessageImpl(req.getData(),req.getType(),true); + return new MemBackedMessageImpl(req.getData(), req.getType(), true); } }); @@ -255,13 +258,13 @@ if (_response != null) return _response; - return _response = doInDb(new InDbAction<MessageImpl>() { + return _response = doInTX(new InDbAction<MessageImpl>() { public MessageImpl call(MessageExchangeDAO dao) { MessageDAO req = dao.getResponse(); if (req == null) return null; - return new MemBackedMessageImpl(req.getData(),req.getType(),true); - + return new MemBackedMessageImpl(req.getData(), req.getType(), true); + } }); } @@ -277,7 +280,7 @@ setStatus(Status.FAULT); _fault = faultType; _response = (MessageImpl) outputFaultMessage; - + _changes.add(Change.RESPONSE); } @@ -295,7 +298,7 @@ _response = (MessageImpl) outputMessage; _response.makeReadOnly(); _changes.add(Change.RESPONSE); - + } void setFailure(FailureType type, String reason, Element details) throws BpelEngineException { @@ -303,7 +306,7 @@ setStatus(Status.FAILURE); _failureType = type; _explanation = reason; - + _changes.add(Change.RESPONSE); } @@ -312,7 +315,7 @@ } public Message createMessage(javax.xml.namespace.QName msgType) { - return new MemBackedMessageImpl(null,msgType,false); + return new MemBackedMessageImpl(null, msgType, false); } public void setEndpointReference(EndpointReference ref) { @@ -324,7 +327,7 @@ if (_epr != null) return _epr; - return _epr = doInDb(new InDbAction<EndpointReference>() { + return _epr = doInTX(new InDbAction<EndpointReference>() { public EndpointReference call(MessageExchangeDAO mexdao) { Element eprdao = mexdao.getEPR(); @@ -335,14 +338,13 @@ } - public String getProperty(final String key) { if (!_loadedProperties.contains(key)) { - _properties.put(key, doInDb(new InDbAction<String> () { + _properties.put(key, doInTX(new InDbAction<String>() { public String call(MessageExchangeDAO mexdao) { return mexdao.getProperty(key); } - + })); _loadedProperties.add(key); } @@ -351,7 +353,7 @@ } public void setProperty(String key, String value) { - _properties.put(key,value); + _properties.put(key, value); _loadedProperties.add(key); _modifiedProperties.add(key); } @@ -359,13 +361,13 @@ public Set<String> getPropertyNames() { if (_propNames != null) return _propNames; - - return _propNames = doInDb(new InDbAction<Set<String>>() { + + return _propNames = doInTX(new InDbAction<Set<String>>() { public Set<String> call(MessageExchangeDAO mexdao) { return mexdao.getPropertyNames(); } }); - + } public void release() { @@ -376,22 +378,47 @@ public String toString() { return "MEX[" + _mexId + "]"; } - + protected void assertTransaction() { if (!_contexts.scheduler.isTransacted()) throw new BpelEngineException("Operation must be performed in a transaction!"); } - protected <T> T doInDb(InDbAction<T> action) { + protected <T> T doInTX(final InDbAction<T> action) { if (_txflag) { - MessageExchangeDAO mexDao; - action.call(mexDao); + assertTransaction(); + return action.call(getDAO()); } else { + try { + return _contexts.scheduler.execIsolatedTransaction(new Callable<T>() { + public T call() throws Exception { + assertTransaction(); + return action.call(getDAO()); + } + + }).get(); + } catch (Exception ie) { + __log.error("Internal error executing transaction.", ie); + throw new BpelEngineException("Internal Error",ie); + } } } + /** + * Get the DAO object. Note, we can do this only when we are running in a transaction. + * + * @return + */ + protected MessageExchangeDAO getDAO() { + assertTransaction(); + MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(_mexId); + if (mexdao == null) + mexdao = _contexts.dao.getConnection().getMessageExchange(_mexId); + return mexdao; + } + interface InDbAction<T> { public T call(MessageExchangeDAO mexdao); } - + } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=550983&r1=550982&r2=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -95,20 +95,8 @@ __log.debug("invoke() EPR= " + _epr + " ==> " + target); setStatus(Status.REQUEST); save(getDAO()); - scheduleInvoke(target); - } - /** - * Get the DAO object. Note, we can do this for RELIABLE, since we are guaranteed to be running in - * a transaction. - * - * @return - */ - MessageExchangeDAO getDAO() { - MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(_mexId); - if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(_mexId); - return mexdao; - } + } Added: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=auto&rev=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (added) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jun 26 16:21:01 2007 @@ -0,0 +1,16 @@ +package org.apache.ode.bpel.engine; + +/** + * Transacted my-role message exchange. + * + * TODO: IMPLEMENT! + * @author Maciej Szefler + * + */ +public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl { + + public TransactedMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + super(engine, mexId); + } + +} Propchange: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?view=diff&rev=550983&r1=550982&r2=550983 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Tue Jun 26 16:21:01 2007 @@ -60,7 +60,7 @@ } public enum Type { - TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL + TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL, INVOKE_TIMEOUT } public String getChannel() {