Author: mszefler
Date: Mon Jun 25 17:58:15 2007
New Revision: 550650

URL: http://svn.apache.org/viewvc?view=rev&rev=550650
Log:
BART

Added:
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
      - copied, changed from r550560, 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Removed:
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Modified:
    
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
    
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
    
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java

Modified: 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
 Mon Jun 25 17:58:15 2007
@@ -19,7 +19,6 @@
 
 package org.apache.ode.bpel.iapi;
 
-import javax.xml.namespace.QName;
 
 /**
  * Interface exposing the BPEL "engine". Basically, this interface facilitates
@@ -34,26 +33,5 @@
  */
 public interface BpelEngine extends Scheduler.JobProcessor {
 
-    /**
-     * Create a "my role" message exchange for invoking a BPEL process.
-     * 
-     * @param serviceId
-     *            the service id of the process being called, if known
-     * @param operation
-     *            name of the operation
-     * 
-     * @return [EMAIL PROTECTED] MyRoleMessageExchange} the newly created 
message exchange
-     */
-    MyRoleMessageExchange createMessageExchange(String clientKey, QName 
serviceId, String operation)
-            throws BpelEngineException;
-
-    /**
-     * Retrieve a message identified by the given identifer.
-     * 
-     * @param mexId
-     *            message exhcange identifier
-     * @return associated message exchange
-     */
-    MessageExchange getMessageExchange(String mexId);
-
+   
 }

Modified: 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelServer.java
 Mon Jun 25 17:58:15 2007
@@ -18,12 +18,13 @@
  */
 package org.apache.ode.bpel.iapi;
 
+import java.util.Set;
+
 import javax.xml.namespace.QName;
 
 
 /**
- * Interface implemented by the BPEL server. Provides methods for
- * life-cycle management.
+ * Interface implemented by the BPEL server. Provides methods for life-cycle 
management and process invocation. 
  * 
  * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
  */
@@ -116,5 +117,39 @@
      * @throws BpelEngineException
      */
     void unregister(QName pid) throws BpelEngineException;
+
+    
+    /**
+     * Inquire of the engine the invocation styles that are supported for a 
given service. 
+     * @param serviceId service identifier 
+     * @return set of supported [EMAIL PROTECTED] InvocationStyle}s
+     */
+    Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId);
+    
+    /**
+     * Create a "my role" message exchange for invoking a BPEL process.
+     * 
+     * @param serviceId
+     *            the service id of the process being called, if known
+     * @param operation
+     *            name of the operation
+     * 
+     * @return [EMAIL PROTECTED] MyRoleMessageExchange} the newly created 
message exchange
+     */
+    MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, 
+            QName serviceId, 
+            String operation,
+            String clientKey)
+            throws BpelEngineException;
+
+    /**
+     * Retrieve a message identified by the given identifer.
+     * 
+     * @param mexId
+     *            message exhcange identifier
+     * @return associated message exchange
+     */
+    MessageExchange getMessageExchange(String mexId) 
+        throws BpelEngineException;
 
 }

Modified: 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MyRoleMessageExchange.java
 Mon Jun 25 17:58:15 2007
@@ -56,6 +56,8 @@
      */
     CorrelationStatus getCorrelationStatus();
 
+    void setRequest(Message request);
+    
     /**
      * "Invoke" a process hosted by the BPEL engine. The state of the 
invocation
      * may be obtained by a call to the [EMAIL PROTECTED] 
MessageExchange#getStatus()}
@@ -68,20 +70,19 @@
      * [EMAIL PROTECTED] 
MessageExchangeContext#onAsyncReply(MyRoleMessageExchange)} when
      * the response become available.
      */
-    Future<MessageExchange.Status> invoke(Message request);
+    void invokeBlocking();
 
+    void invokeReliable();
+    
+    void invokeAsync();
+    
+    void invokeTransacted();
+    
     /**
      * Complete the message, exchange: indicates that the client has receive 
the
      * response (if any).
      */
     void complete();
-
-    /**
-     * Associate a client key with this message exchange.
-     * 
-     * @param clientKey
-     */
-    void setClientId(String clientKey);
 
     /**
      * Get the previously associated client key for this exchange.

Modified: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 Mon Jun 25 17:58:15 2007
@@ -29,6 +29,7 @@
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
@@ -106,10 +107,13 @@
         _contexts = contexts;
     }
 
-    public MyRoleMessageExchange createMessageExchange(String clientKey, QName 
targetService,
-                                                       String operation, 
String pipedMexId)
+    MyRoleMessageExchange createMessageExchange(InvocationStyle istyle, QName 
targetService, String operation, String clientKey)
             throws BpelEngineException {
 
+        // TODO: for now, invocation of the engine is only supported in 
RELIABLE mode.
+        if (istyle != InvocationStyle.RELIABLE)
+            throw new BpelEngineException("Unsupported InvocationStyle: " + 
istyle);
+        
         BpelProcess target = route(targetService, null);
 
         MessageExchangeDAO dao;
@@ -125,7 +129,7 @@
         dao.setStatus(Status.NEW.toString());
         dao.setOperation(operation);
         dao.setPipedMessageExchangeId(pipedMexId);
-        MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(this, 
dao);
+        ReliableMyRoleMessageExchangeImpl mex = new 
ReliableMyRoleMessageExchangeImpl(this, dao);
 
         if (target != null) {
             target.initMyRoleMex(mex);
@@ -134,11 +138,7 @@
         return mex;
     }
 
-    public MyRoleMessageExchange createMessageExchange(String clientKey, QName 
targetService, String operation) {
-        return createMessageExchange(clientKey, targetService, operation, 
null);        
-    }
-
-    public MessageExchange getMessageExchange(String mexId) throws 
BpelEngineException {
+    MessageExchange getMessageExchange(String mexId) throws 
BpelEngineException {
         MessageExchangeDAO mexdao = 
_contexts.inMemDao.getConnection().getMessageExchange(mexId);
         if (mexdao == null) mexdao = 
_contexts.dao.getConnection().getMessageExchange(mexId);
         if (mexdao == null)
@@ -167,7 +167,7 @@
             }
             break;
         case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-            mex = new MyRoleMessageExchangeImpl(this, mexdao);
+            mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao);
             if (process != null) {
                 OPartnerLink plink = (OPartnerLink) 
process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
                 PortType ptype = plink.myRolePortType;

Modified: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Mon Jun 25 17:58:15 2007
@@ -148,7 +148,7 @@
      * 
      * @param mex
      */
-    void invokeProcess(MyRoleMessageExchangeImpl mex) {
+    void invokeProcess(ReliableMyRoleMessageExchangeImpl mex) {
         _hydrationLatch.latch(1);
         try {
             PartnerLinkMyRoleImpl target = 
getMyRoleForService(mex.getServiceName());
@@ -179,7 +179,7 @@
         }
     }
 
-    private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
+    private MessageExchangeDAO getDAO(ReliableMyRoleMessageExchangeImpl mex) {
 
     }
 
@@ -191,7 +191,7 @@
         return null;
     }
 
-    void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
+    void initMyRoleMex(ReliableMyRoleMessageExchangeImpl mex) {
         markused();
         PartnerLinkMyRoleImpl target = null;
         for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
@@ -274,7 +274,7 @@
      *            message exchange
      * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
      */
-    boolean processInterceptors(MyRoleMessageExchangeImpl mex, 
InterceptorInvoker invoker) {
+    boolean processInterceptors(ReliableMyRoleMessageExchangeImpl mex, 
InterceptorInvoker invoker) {
         InterceptorContextImpl ictx = new 
InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), 
_pconf);
 
         for (MessageExchangeInterceptor i : _mexInterceptors)
@@ -307,7 +307,7 @@
                 if (__log.isDebugEnabled()) {
                     __log.debug("InvokeInternal event for mexid " + 
we.getMexId());
                 }
-                MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) 
_engine.getMessageExchange(we.getMexId());
+                ReliableMyRoleMessageExchangeImpl mex = 
(ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
                 invokeProcess(mex);
             } else {
                 // Instance level events
@@ -612,7 +612,7 @@
 
     /** Create a version-appropriate runtime context. */
     BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, 
PROCESS template,
-            MyRoleMessageExchangeImpl instantiatingMessageExchange) {
+            ReliableMyRoleMessageExchangeImpl instantiatingMessageExchange) {
         return new BpelRuntimeContextImpl(this, dao, template, 
instantiatingMessageExchange);
 
     }

Modified: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Mon Jun 25 17:58:15 2007
@@ -591,7 +591,7 @@
 
         } else /* IL-mediated communication */  {
             // TODO: distinguish between different kinds of my-role mexss
-            MyRoleMessageExchangeImpl myRoleMex = new 
MyRoleMessageExchangeImpl();
+            ReliableMyRoleMessageExchangeImpl myRoleMex = new 
ReliableMyRoleMessageExchangeImpl();
             _bpelProcess._engine._contexts.mexContext.onAsyncReply(myRoleMex);
         }
 
@@ -1121,7 +1121,7 @@
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = 
_dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new 
MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
+                ReliableMyRoleMessageExchangeImpl mex = new 
ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
                 switch (mex.getStatus()) {
                     case ASYNC:
                     case RESPONSE:
@@ -1146,7 +1146,7 @@
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = 
_dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new 
MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
+                ReliableMyRoleMessageExchangeImpl mex = new 
ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
                 _bpelProcess.initMyRoleMex(mex);
 
                 Message message = mex.createMessage(faultData.getFaultName());
@@ -1165,7 +1165,7 @@
         String[] mexRefs = _outstandingRequests.releaseAll();
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = 
_dao.getConnection().getMessageExchange(mexId);
-            MyRoleMessageExchangeImpl mex = new 
MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
+            ReliableMyRoleMessageExchangeImpl mex = new 
ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
             _bpelProcess.initMyRoleMex(mex);
             mex.setFailure(FailureType.OTHER, "No response.", null);
             _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);

Modified: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 Mon Jun 25 17:58:15 2007
@@ -42,7 +42,7 @@
 
 /**
  * Base implementation of the [EMAIL PROTECTED] MessageExchange} interface. 
This interfaces is exposed to the Integration Layer (IL)
- * to allow it to implement incoming (via [EMAIL PROTECTED] 
MyRoleMessageExchangeImpl}) and outgoing (via [EMAIL PROTECTED] 
PartnerRoleMessageExchangeImpl})
+ * to allow it to implement incoming (via [EMAIL PROTECTED] 
ReliableMyRoleMessageExchangeImpl}) and outgoing (via [EMAIL PROTECTED] 
PartnerRoleMessageExchangeImpl})
  * communications. 
  * 
  * It should be noted that this class and its derived classes are in NO WAY 
THREADSAFE. It is imperative that the integration layer
@@ -236,9 +236,6 @@
         return _portType;
     }
 
-    QName getServiceName() {
-        return _callee;
-    }
     public Message getRequest() {
         if (_request != null)
             return _request;
@@ -379,9 +376,18 @@
     public String toString() {
         return "MEX[" + _mexId + "]";
     }
+    
+    protected void assertTransaction() {
+        if (!_contexts.scheduler.isTransacted())
+            throw new BpelEngineException("Operation must be performed in a 
transaction!");
+    }
 
-    protected <T> T doInDb(InDbAction<T> name) {
-        throw new UnsupportedOperationException();
+    protected <T> T doInDb(InDbAction<T> action) {
+        if (_txflag) {
+            MessageExchangeDAO mexDao;
+            action.call(mexDao);
+        } else {
+        }
     }
 
     interface InDbAction<T> {

Modified: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=550650&r1=550649&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
 Mon Jun 25 17:58:15 2007
@@ -83,7 +83,7 @@
      * @param mex
      *            exchange to which the message is related
      */
-    public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
+    public void invokeMyRole(ReliableMyRoleMessageExchangeImpl mex) {
         if (__log.isTraceEnabled()) {
             __log.trace(ObjectPrinter.stringifyMethodEnter(this + 
":inputMsgRcvd", new Object[] {
                     "messageExchange", mex }));
@@ -266,7 +266,7 @@
         return op;
     }
 
-    private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl 
mex) {
+    private CorrelationKey[] 
computeCorrelationKeys(ReliableMyRoleMessageExchangeImpl mex) {
         Operation operation = mex.getOperation();
         Element msg = mex.getRequest().getMessage();
         javax.wsdl.Message msgDescription = operation.getInput().getMessage();

Copied: 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
 (from r550560, 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java)
URL: 
http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=550650&p1=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java&r1=550560&p2=incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java&r2=550650
==============================================================================
--- 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
 Mon Jun 25 17:58:15 2007
@@ -21,12 +21,15 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
 import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -41,25 +44,49 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements 
MyRoleMessageExchange {
+/**
+ * Provides an implementation of the [EMAIL PROTECTED] MyRoleMessageExchange} 
inteface for interactions performed in the
+ * [EMAIL PROTECTED] InvocationStyle#RELIABLE} style.
+ * 
+ * @author Maciej Szefler
+ */
+class ReliableMyRoleMessageExchangeImpl extends MessageExchangeImpl implements 
MyRoleMessageExchange {
+
+    private static final Log __log = 
LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
 
-    private static final Log __log = 
LogFactory.getLog(MyRoleMessageExchangeImpl.class);
     public static final int TIMEOUT = 2 * 60 * 1000;
 
-    private static Map<String, ResponseFuture> _waitingFutures =
-            new ConcurrentHashMap<String, ResponseFuture>();
+    private static Map<String, ResponseFuture> _waitingFutures = new 
ConcurrentHashMap<String, ResponseFuture>();
+
+    private CorrelationStatus _cstatus;
+
+    private String _clientId;
 
+    public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String 
mexId) {
+        super(engine, mexId);
 
-    public MyRoleMessageExchangeImpl() {
-        super(engine, mexdao);
+        // RELIABLE means we are bound to a transaction
+        _txflag = true;
     }
 
     public CorrelationStatus getCorrelationStatus() {
-        return CorrelationStatus.valueOf(getDAO().getCorrelationStatus());
+        return _cstatus;
     }
 
-    void setCorrelationStatus(CorrelationStatus status) {
-        getDAO().setCorrelationStatus(status.toString());
+    @Override
+    void load(MessageExchangeDAO dao) {
+        super.load(dao);
+        if (_cstatus == null)
+            _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
+        if (_clientId == null)
+            _clientId = dao.getCorrelationId();
+    }
+
+    @Override
+    public void save(MessageExchangeDAO dao) {
+        super.save(dao);
+        dao.setCorrelationStatus(_cstatus.toString());
+        dao.setCorrelationId(_clientId);
     }
 
     /**
@@ -67,21 +94,19 @@
      * 
      * @param mex
      *            message exchange
-     * @return <code>true</code> if execution should continue,
-     *         <code>false</code> otherwise
+     * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
      */
-    private boolean processInterceptors(MyRoleMessageExchangeImpl mex, 
InterceptorInvoker invoker) {
-        InterceptorContextImpl ictx = new 
InterceptorContextImpl(_engine._contexts.dao.getConnection(), 
-                mex._dao.getProcess(), null);
+    private boolean processInterceptors(InterceptorInvoker invoker, 
MessageExchangeDAO mexDao) {
+        InterceptorContextImpl ictx = new 
InterceptorContextImpl(_engine._contexts.dao.getConnection(), 
mexDao.getProcess(), null);
 
         for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
-            if (!processInterceptor(i, mex, ictx, invoker))
+            if (!processInterceptor(i, this, ictx, invoker))
                 return false;
 
         return true;
     }
 
-    boolean processInterceptor(MessageExchangeInterceptor i, 
MyRoleMessageExchangeImpl mex, InterceptorContext ictx,
+    boolean processInterceptor(MessageExchangeInterceptor i, 
ReliableMyRoleMessageExchangeImpl mex, InterceptorContext ictx,
             InterceptorInvoker invoker) {
         __log.debug(invoker + "--> interceptor " + i);
         try {
@@ -99,40 +124,54 @@
         return true;
     }
 
-    public Future<MessageExchange.Status> invoke(Message request) {
+    public Future<MessageExchange.Status> invoke(final Message request) {
         if (request == null) {
             String errmsg = "Must pass non-null message to invoke()!";
-            __log.fatal(errmsg);
             throw new NullPointerException(errmsg);
         }
 
-        _dao.setRequest(((MessageImpl) request)._dao);
-        setStatus(MessageExchange.Status.REQUEST);
-
-        if (!processInterceptors(this, 
InterceptorInvoker.__onBpelServerInvoked)) {
-            throw new BpelEngineException("Intercepted.");
-        }
-
-        BpelProcess target = _engine.route(getDAO().getCallee(), request);
-
-        if (__log.isDebugEnabled())
-            __log.debug("invoke() EPR= " + _epr + " ==> " + target);
+        // For reliable, we MUST HAVE A TRANSACTION!
+        assertTransaction();
 
-        
-        ResponseFuture future = new ResponseFuture();
-        
+        BpelProcess target = _engine.route(_callee, request);
         if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));
 
-            
setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
+            ResponseFuture future = new ResponseFuture();
+
+            _cstatus = MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT;
             setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null, 
null);
-            future.done(_lastStatus);
-        } else {
+            future.done(_status);
+
+            return future;
+        }
+
+        doInDb(new InDbAction<Void>() {
+
+            public Void call(MessageExchangeDAO mexdao) {
+                // TODO: perhaps we should check if already backed by DB?
+                MessageDAO msgDao = mexdao.createMessage(request.getType());
+                msgDao.setData(request.getMessage());
+                setStatus(MessageExchange.Status.REQUEST);
+
+                if (!processInterceptors(this, 
InterceptorInvoker.__onBpelServerInvoked)) {
+                    throw new BpelEngineException("Intercepted.");
+                }
+
+                if (__log.isDebugEnabled())
+                    __log.debug("invoke() EPR= " + _epr + " ==> " + target);
+
+            }
+
+        });
+
+        {
             // Schedule a new job for invocation
             WorkEvent we = new WorkEvent();
             we.setType(WorkEvent.Type.INVOKE_INTERNAL);
-            if (target.isInMemory()) we.setInMem(true);
+            if (target.isInMemory())
+                we.setInMem(true);
             we.setProcessId(target.getPID());
             we.setMexId(getDAO().getMessageExchangeId());
 
@@ -144,7 +183,6 @@
                 future.done(_lastStatus);
             }
 
-
             if (target.isInMemory())
                 _engine._contexts.scheduler.scheduleVolatileJob(true, 
we.getDetail());
             else
@@ -158,16 +196,8 @@
     public void complete() {
     }
 
-    public QName getServiceName() {
-        return getDAO().getCallee();
-    }
-
-    public void setClientId(String clientKey) {
-        getDAO().setCorrelationId(clientKey);
-    }
-
     public String getClientId() {
-        return getDAO().getCorrelationId();
+        return _clientId;
     }
 
     public String toString() {
@@ -179,11 +209,6 @@
         }
     }
 
-    public boolean isAsynchronous() {
-        return true;
-    }
-
-
     protected void responseReceived() {
         final String mexid = getMessageExchangeId();
         _engine._contexts.scheduler.registerSynchronizer(new 
Scheduler.Synchronizer() {
@@ -192,18 +217,19 @@
                 ResponseFuture callback = _waitingFutures.remove(mexid);
                 callback.done(_lastStatus);
             }
+
             public void beforeCompletion() {
             }
         });
     }
-    
+
     private static class ResponseFuture implements Future<Status> {
         private Status _status;
 
         public boolean cancel(boolean mayInterruptIfRunning) {
             return false;
         }
-        
+
         public Status get() throws InterruptedException, ExecutionException {
             try {
                 return get(0, TimeUnit.MILLISECONDS);
@@ -212,22 +238,20 @@
                 throw new RuntimeException(e);
             }
         }
-        
-        public Status get(long timeout, TimeUnit unit) 
-            throws InterruptedException, ExecutionException, TimeoutException {
-            
-            
-            synchronized(this) {
+
+        public Status get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+
+            synchronized (this) {
                 if (_status != null)
                     return _status;
-                
+
                 while (_status == null) {
                     this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
                 }
-    
+
                 if (_status == null)
                     throw new TimeoutException();
-                
+
                 return _status;
             }
         }
@@ -235,13 +259,13 @@
         public boolean isCancelled() {
             return false;
         }
-        
+
         public boolean isDone() {
             return _status != null;
         }
-        
+
         void done(Status status) {
-            synchronized(this) {
+            synchronized (this) {
                 _status = status;
                 this.notifyAll();
             }


Reply via email to