Author: mszefler
Date: Wed Jun 20 10:34:36 2007
New Revision: 549162

URL: http://svn.apache.org/viewvc?view=rev&rev=549162
Log:
Hydration latch.

Modified:
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=549162&r1=549161&r2=549162
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Wed Jun 20 10:34:36 2007
@@ -18,9 +18,21 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -54,15 +66,6 @@
 import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 
-import javax.xml.namespace.QName;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Entry point into the runtime of a BPEL process.
  * 
@@ -75,47 +78,56 @@
     private static final Messages __msgs = 
MessageBundle.getMessages(Messages.class);
 
     private volatile Map<OPartnerLink, PartnerLinkPartnerRoleImpl> 
_partnerRoles;
+
     private volatile Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles;
+
     /** Mapping from {"Service Name" (QNAME) / port} to a myrole. */
     private volatile Map<Endpoint, PartnerLinkMyRoleImpl> _endpointToMyRoleMap;
 
     // Backup hashmaps to keep initial endpoints handy after dehydration
-    private Map<Endpoint,EndpointReference> _myEprs =
-            new HashMap<Endpoint, EndpointReference>();
-    private Map<Endpoint,EndpointReference> _partnerEprs =
-            new HashMap<Endpoint, EndpointReference>();
-    private Map<Endpoint,PartnerRoleChannel> _partnerChannels =
-            new HashMap<Endpoint, PartnerRoleChannel>();
+    private Map<Endpoint, EndpointReference> _myEprs = new HashMap<Endpoint, 
EndpointReference>();
+
+    private Map<Endpoint, EndpointReference> _partnerEprs = new 
HashMap<Endpoint, EndpointReference>();
+
+    private Map<Endpoint, PartnerRoleChannel> _partnerChannels = new 
HashMap<Endpoint, PartnerRoleChannel>();
 
     final QName _pid;
+
     private volatile OProcess _oprocess;
+
     // Has the process already been hydrated before?
     private boolean _hydratedOnce = false;
+
     /** Last time the process was used. */
     private volatile long _lastUsed;
 
     BpelEngineImpl _engine;
+
     DebuggerSupport _debugger;
+
     ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
+
     private ReplacementMap _replacementMap;
+
     final ProcessConf _pconf;
-    // Notifying the server when a process hydrates
-    private ProcessLifecycleCallback _lifeCallback;
+
     /** [EMAIL PROTECTED] MessageExchangeInterceptor}s registered for this 
process. */
     private final List<MessageExchangeInterceptor> _mexInterceptors = new 
ArrayList<MessageExchangeInterceptor>();
 
-    public BpelProcess(ProcessConf conf, BpelEventListener debugger, 
ProcessLifecycleCallback lifeCallback) {
+    /** Latch-like thing to control hydration/dehydration. */
+    private HydrationLatch _hydrationLatch;
+
+    public BpelProcess(ProcessConf conf, BpelEventListener debugger) {
         _pid = conf.getProcessId();
         _pconf = conf;
-        _lifeCallback = lifeCallback;
+        _hydrationLatch = new HydrationLatch();
     }
 
     public String toString() {
         return "BpelProcess[" + _pid + "]";
     }
 
-    public void recoverActivity(ProcessInstanceDAO instanceDAO, String 
channel, long activityId,
-                                String action, FaultData fault) {
+    public void recoverActivity(ProcessInstanceDAO instanceDAO, String 
channel, long activityId, String action, FaultData fault) {
         if (__log.isDebugEnabled())
             __log.debug("Recovering activity in process " + 
instanceDAO.getInstanceId() + " with action " + action);
         markused();
@@ -132,27 +144,33 @@
 
     /**
      * Entry point for message exchanges aimed at the my role.
+     * 
      * @param mex
      */
     void invokeProcess(MyRoleMessageExchangeImpl mex) {
-        PartnerLinkMyRoleImpl target = 
getMyRoleForService(mex.getServiceName());
-        if (target == null) {
-            String errmsg = 
__msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
-            __log.error(errmsg);
-            mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, 
errmsg, null);
-            return;
-        }
+        _hydrationLatch.latch(1);
+        try {
+            PartnerLinkMyRoleImpl target = 
getMyRoleForService(mex.getServiceName());
+            if (target == null) {
+                String errmsg = 
__msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
+                __log.error(errmsg);
+                mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, 
errmsg, null);
+                return;
+            }
 
-        mex.getDAO().setProcess(getProcessDAO());
-
-        if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
-            __log.debug("Aborting processing of mex " + mex + " due to 
interceptors.");
-            return;
-        }
+            mex.getDAO().setProcess(getProcessDAO());
 
-        markused();
-        target.invokeMyRole(mex);
-        markused();
+            if (!processInterceptors(mex, 
InterceptorInvoker.__onProcessInvoked)) {
+                __log.debug("Aborting processing of mex " + mex + " due to 
interceptors.");
+                return;
+            }
+
+            markused();
+            target.invokeMyRole(mex);
+            markused();
+        } finally {
+            _hydrationLatch.release(1);
+        }
 
         // For a one way, once the engine is done, the mex can be safely 
released.
         if 
(mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
@@ -178,16 +196,19 @@
         if (target != null) {
             mex.setPortOp(target._plinkDef.myRolePortType, 
target._plinkDef.getMyRoleOperation(mex.getOperationName()));
         } else {
-            __log.warn("Couldn't find endpoint from service " + 
mex.getServiceName()
-                    + " when initializing a myRole mex.");
+            __log.warn("Couldn't find endpoint from service " + 
mex.getServiceName() + " when initializing a myRole mex.");
         }
     }
 
     /**
      * Extract the value of a BPEL property from a BPEL messsage variable.
-     * @param msgData message variable data
-     * @param alias alias to apply
-     * @param target description of the data (for error logging only)
+     * 
+     * @param msgData
+     *            message variable data
+     * @param alias
+     *            alias to apply
+     * @param target
+     *            description of the data (for error logging only)
      * @return value of the property
      * @throws FaultException
      */
@@ -230,16 +251,15 @@
     }
 
     /**
-     * Get the element name for a given WSDL part. If the part is an
-     * <em>element</em> part, the name of that element is returned. If the
-     * part is an XML schema typed part, then the name of the part is returned
-     * in the null namespace.
-     * @param part WSDL [EMAIL PROTECTED] javax.wsdl.Part}
+     * Get the element name for a given WSDL part. If the part is an 
<em>element</em> part, the name of that element is returned.
+     * If the part is an XML schema typed part, then the name of the part is 
returned in the null namespace.
+     * 
+     * @param part
+     *            WSDL [EMAIL PROTECTED] javax.wsdl.Part}
      * @return name of element containing said part
      */
     static QName getElementNameForPart(OMessageVarType.Part part) {
-        return (part.type instanceof OElementVarType) ? ((OElementVarType) 
part.type).elementType
-                : new QName(null, part.name);
+        return (part.type instanceof OElementVarType) ? ((OElementVarType) 
part.type).elementType : new QName(null, part.name);
     }
 
     /**
@@ -247,12 +267,10 @@
      * 
      * @param mex
      *            message exchange
-     * @return <code>true</code> if execution should continue,
-     *         <code>false</code> otherwise
+     * @return <code>true</code> if execution should continue, 
<code>false</code> otherwise
      */
     boolean processInterceptors(MyRoleMessageExchangeImpl mex, 
InterceptorInvoker invoker) {
-        InterceptorContextImpl ictx = new 
InterceptorContextImpl(getEngine()._contexts.dao.getConnection(),
-                getProcessDAO(), _pconf);
+        InterceptorContextImpl ictx = new 
InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), 
_pconf);
 
         for (MessageExchangeInterceptor i : _mexInterceptors)
             if (!mex.processInterceptor(i, mex, ictx, invoker))
@@ -264,64 +282,68 @@
         return true;
     }
 
-
     /**
      * @see 
org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
      */
     public void handleWorkEvent(Map<String, Object> jobData) {
-        markused();
-
-        if (__log.isDebugEnabled()) {
-            __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", 
new Object[] { "jobData", jobData }));
-        }
-
-        WorkEvent we = new WorkEvent(jobData);
+        _hydrationLatch.latch(1);
+        try {
+            markused();
 
-        // Process level events
-        if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) {
             if (__log.isDebugEnabled()) {
-                __log.debug("InvokeInternal event for mexid " + we.getMexId());
-            }
-            MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) 
getEngine().getMessageExchange(we.getMexId());
-            if (mex == null) throw new ContextException("Unable to find MEX " 
+ we.getMexId());
-            invokeProcess(mex);
-        } else {
-            // Instance level events
-            ProcessInstanceDAO procInstance = 
getProcessDAO().getInstance(we.getIID());
-            if (procInstance == null) {
-                if (__log.isDebugEnabled()) {
-                    __log.debug("handleWorkEvent: no ProcessInstance found 
with iid " + we.getIID() + "; ignoring.");
-                }
-                return;
+                
__log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] 
{ "jobData", jobData }));
             }
 
-            BpelRuntimeContextImpl processInstance = 
createRuntimeContext(procInstance, null, null);
-            switch (we.getType()) {
-            case TIMER:
-                if (__log.isDebugEnabled()) {
-                    __log.debug("handleWorkEvent: TimerWork event for process 
instance " + processInstance);
-                }
-                processInstance.timerEvent(we.getChannel());
-                break;
-            case RESUME:
+
+            WorkEvent we = new WorkEvent(jobData);
+
+            // Process level events
+            if (we.getType().equals(WorkEvent.Type.INVOKE_INTERNAL)) {
                 if (__log.isDebugEnabled()) {
-                    __log.debug("handleWorkEvent: ResumeWork event for iid " + 
we.getIID());
+                    __log.debug("InvokeInternal event for mexid " + 
we.getMexId());
                 }
-                processInstance.execute();
-                break;
-            case INVOKE_RESPONSE:
-                if (__log.isDebugEnabled()) {
-                    __log.debug("InvokeResponse event for iid " + we.getIID());
+                MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) 
_engine.getMessageExchange(we.getMexId());
+                invokeProcess(mex);
+            } else {
+                // Instance level events
+                ProcessInstanceDAO procInstance = 
getProcessDAO().getInstance(we.getIID());
+                if (procInstance == null) {
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("handleWorkEvent: no ProcessInstance found 
with iid " + we.getIID() + "; ignoring.");
+                    }
+                    return;
                 }
-                processInstance.invocationResponse(we.getMexId(), 
we.getChannel());
-                processInstance.execute();
-                break;
-            case MATCHER:
-                if (__log.isDebugEnabled()) {
-                    __log.debug("Matcher event for iid " + we.getIID());
+
+                BpelRuntimeContextImpl processInstance = 
createRuntimeContext(procInstance, null, null);
+                switch (we.getType()) {
+                case TIMER:
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("handleWorkEvent: TimerWork event for 
process instance " + processInstance);
+                    }
+                    processInstance.timerEvent(we.getChannel());
+                    break;
+                case RESUME:
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("handleWorkEvent: ResumeWork event for iid 
" + we.getIID());
+                    }
+                    processInstance.execute();
+                    break;
+                case INVOKE_RESPONSE:
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("InvokeResponse event for iid " + 
we.getIID());
+                    }
+                    processInstance.invocationResponse(we.getMexId(), 
we.getChannel());
+                    processInstance.execute();
+                    break;
+                case MATCHER:
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("Matcher event for iid " + we.getIID());
+                    }
+                    processInstance.matcherEvent(we.getCorrelatorId(), 
we.getCorrelationKey());
                 }
-                processInstance.matcherEvent(we.getCorrelatorId(), 
we.getCorrelationKey());
             }
+        } finally {
+            _hydrationLatch.release(1);
         }
     }
 
@@ -335,8 +357,8 @@
         for (Map.Entry<String, Endpoint> provide : 
_pconf.getProvideEndpoints().entrySet()) {
             OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
             if (plink == null) {
-                String errmsg = "Error in deployment descriptor for process " 
+ _pid
-                        + "; reference to unknown partner link " + 
provide.getKey();
+                String errmsg = "Error in deployment descriptor for process " 
+ _pid + "; reference to unknown partner link "
+                        + provide.getKey();
                 __log.error(errmsg);
                 throw new BpelEngineException(errmsg);
             }
@@ -347,13 +369,13 @@
         for (Map.Entry<String, Endpoint> invoke : 
_pconf.getInvokeEndpoints().entrySet()) {
             OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
             if (plink == null) {
-                String errmsg = "Error in deployment descriptor for process " 
+ _pid
-                        + "; reference to unknown partner link " + 
invoke.getKey();
+                String errmsg = "Error in deployment descriptor for process " 
+ _pid + "; reference to unknown partner link "
+                        + invoke.getKey();
                 __log.error(errmsg);
                 throw new BpelEngineException(errmsg);
             }
-            __log.debug("Processing <invoke> element for process " + _pid + ": 
partnerlink " + invoke.getKey()
-                    + " --> " + invoke.getValue());
+            __log.debug("Processing <invoke> element for process " + _pid + ": 
partnerlink " + invoke.getKey() + " --> "
+                    + invoke.getValue());
         }
 
         for (OPartnerLink pl : oprocess.getAllPartnerLinks()) {
@@ -367,16 +389,16 @@
             }
 
             if (pl.hasPartnerRole()) {
-                PartnerLinkPartnerRoleImpl partnerRole = new 
PartnerLinkPartnerRoleImpl(
-                        this, pl, 
_pconf.getInvokeEndpoints().get(pl.getName()));
+                PartnerLinkPartnerRoleImpl partnerRole = new 
PartnerLinkPartnerRoleImpl(this, pl, _pconf.getInvokeEndpoints().get(
+                        pl.getName()));
                 _partnerRoles.put(pl, partnerRole);
             }
         }
     }
 
     ProcessDAO getProcessDAO() {
-        return _pconf.isTransient() ? 
getEngine()._contexts.inMemDao.getConnection().getProcess(_pid)
-                : getEngine()._contexts.dao.getConnection().getProcess(_pid);
+        return _pconf.isTransient() ? 
_engine._contexts.inMemDao.getConnection().getProcess(_pid) : 
getEngine()._contexts.dao
+                .getConnection().getProcess(_pid);
     }
 
     static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -385,7 +407,7 @@
 
     /**
      * De-serialize the compiled process representation from a stream.
-     *
+     * 
      * @param is
      *            input stream
      * @return process information from configuration database
@@ -417,8 +439,7 @@
         __log.debug("Activating " + _pid);
         // Activate all the my-role endpoints.
         for (Map.Entry<String, Endpoint> entry : 
_pconf.getProvideEndpoints().entrySet()) {
-            EndpointReference initialEPR = getEngine()._contexts
-                    .bindingContext.activateMyRoleEndpoint(_pid, 
entry.getValue());
+            EndpointReference initialEPR = 
_engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, entry.getValue());
             __log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": 
EPR is " + initialEPR);
             _myEprs.put(entry.getValue(), initialEPR);
         }
@@ -430,30 +451,45 @@
     void deactivate() {
         // Deactivate all the my-role endpoints.
         for (Endpoint endpoint : _myEprs.keySet())
-            
getEngine()._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
+            
_engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
 
-         // TODO Deactivate all the partner-role channels
+        // TODO Deactivate all the partner-role channels
     }
 
     EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) {
-        PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link);
-        if (prole == null)
-            throw new IllegalStateException("Unknown partner link " + link);
-        return prole.getInitialEPR();
+        _hydrationLatch.latch(1);
+        try {
+            PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
+            if (prole == null)
+                throw new IllegalStateException("Unknown partner link " + 
link);
+            return prole.getInitialEPR();
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     Endpoint getInitialPartnerRoleEndpoint(OPartnerLink link) {
-        PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link);
-        if (prole == null)
-            throw new IllegalStateException("Unknown partner link " + link);
-        return prole._initialPartner;
+        _hydrationLatch.latch(1);
+        try {
+            PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
+            if (prole == null)
+                throw new IllegalStateException("Unknown partner link " + 
link);
+            return prole._initialPartner;
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     EndpointReference getInitialMyRoleEPR(OPartnerLink link) {
-        PartnerLinkMyRoleImpl myRole = getMyRoles().get(link);
-        if (myRole == null)
-            throw new IllegalStateException("Unknown partner link " + link);
-        return myRole.getInitialEPR();
+        _hydrationLatch.latch(1);
+        try {
+            PartnerLinkMyRoleImpl myRole = _myRoles.get(link);
+            if (myRole == null)
+                throw new IllegalStateException("Unknown partner link " + 
link);
+            return myRole.getInitialEPR();
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     QName getPID() {
@@ -461,10 +497,15 @@
     }
 
     PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) {
-        PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(partnerLink);
-        if (prole == null)
-            throw new IllegalStateException("Unknown partner link " + 
partnerLink);
-        return prole._channel;
+        _hydrationLatch.latch(1);
+        try {
+            PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink);
+            if (prole == null)
+                throw new IllegalStateException("Unknown partner link " + 
partnerLink);
+            return prole._channel;
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     public void saveEvent(ProcessInstanceEvent event, ProcessInstanceDAO 
instanceDao) {
@@ -476,8 +517,10 @@
 
         boolean enabled = _pconf.isEventEnabled(scopeNames, event.getType());
         if (enabled) {
-            if (instanceDao != null) saveInstanceEvent(event, instanceDao);
-            else __log.debug("Couldn't find instance to save event, no event 
generated!");
+            if (instanceDao != null)
+                saveInstanceEvent(event, instanceDao);
+            else
+                __log.debug("Couldn't find instance to save event, no event 
generated!");
         }
     }
 
@@ -485,98 +528,46 @@
         instanceDao.insertBpelEvent(event);
     }
 
+    /**
+     * Ask the process to dehydrate.
+     */
     void dehydrate() {
-        _oprocess = null;
-        _partnerRoles = null;
-        _myRoles = null;
-        _endpointToMyRoleMap = null;
-        _replacementMap = null;
-        _expLangRuntimeRegistry = null;
-    }
+        _hydrationLatch.latch(0);
 
-    void hydrate() {
-        markused();
-        __log.debug("Rehydrating process " + _pconf.getProcessId());
         try {
-            _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream());
-        } catch (Exception e) {
-            String errmsg = "Error reloading compiled process " + _pid + "; 
the file appears to be corrupted.";
-            __log.error(errmsg);
-            throw new BpelEngineException(errmsg, e);
-        }
-
-        _replacementMap = new ReplacementMapImpl(_oprocess);
-
-        // Create an expression language registry for this process
-        ExpressionLanguageRuntimeRegistry elangRegistry = new 
ExpressionLanguageRuntimeRegistry();
-        for (OExpressionLanguage elang : _oprocess.expressionLanguages) {
-            try {
-                elangRegistry.registerRuntime(elang);
-            } catch (ConfigurationException e) {
-                String msg = 
__msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, 
elang.properties);
-                __log.error(msg, e);
-                throw new BpelEngineException(msg, e);
-            }
+            // We don't actually need to do anything, the latch will run the 
doDehydrate method
+            // when necessary..
+        } finally {
+            _hydrationLatch.release(0);
         }
-        _expLangRuntimeRegistry = elangRegistry;
-
-        setRoles(getOProcess());
 
-        if (!_hydratedOnce) {
-            for (PartnerLinkPartnerRoleImpl prole : 
getPartnerRoles().values()) {
-                PartnerRoleChannel channel = 
getEngine()._contexts.bindingContext.createPartnerRoleChannel(_pid,
-                        prole._plinkDef.partnerRolePortType, 
prole._initialPartner);
-                prole._channel = channel;
-                _partnerChannels.put(prole._initialPartner, prole._channel);
-                EndpointReference epr = channel.getInitialEndpointReference();
-                if (epr != null) {
-                    prole._initialEPR = epr;
-                    _partnerEprs.put(prole._initialPartner, epr);
-                }
-                __log.debug("Activated " + _pid + " partnerrole " + 
prole.getPartnerLinkName() + ": EPR is "
-                        + prole._initialEPR);
-            }
-            _hydratedOnce = true;
-        }
-
-        for (PartnerLinkMyRoleImpl myrole : getMyRoles().values()) {
-            myrole._initialEPR = _myEprs.get(myrole._endpoint);
-        }
-
-        for (PartnerLinkPartnerRoleImpl prole : getPartnerRoles().values()) {
-            prole._channel = _partnerChannels.get(prole._initialPartner);
-            if (_partnerEprs.get(prole._initialPartner) != null) {
-                prole._initialEPR = _partnerEprs.get(prole._initialPartner);
-            }
-        }
-
-        _lifeCallback.hydrated(this);
     }
 
     OProcess getOProcess() {
-        if (_oprocess == null) hydrate();
-        return _oprocess;
-    }
-
-    public Map<OPartnerLink, PartnerLinkMyRoleImpl> getMyRoles() {
-        if (_myRoles == null) hydrate();
-        return _myRoles;
-    }
-
-    public Map<OPartnerLink, PartnerLinkPartnerRoleImpl> getPartnerRoles() {
-        if (_partnerRoles == null) hydrate();
-        return _partnerRoles;
+        _hydrationLatch.latch(1);
+        try {
+            return _oprocess;
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
-        if (_endpointToMyRoleMap == null) hydrate();
-        return _endpointToMyRoleMap;
+        _hydrationLatch.latch(1);
+        try {
+            return _endpointToMyRoleMap;
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     public ReplacementMap getReplacementMap() {
-        if (_replacementMap == null) hydrate();
-        assert _replacementMap != null;
-        return _replacementMap;
+        _hydrationLatch.latch(1);
+        try {
+            return _replacementMap;
+        } finally {
+            _hydrationLatch.release(1);
+        }
     }
 
     BpelEngineImpl getEngine() {
@@ -591,6 +582,13 @@
         return _lastUsed;
     }
 
+    /**
+     * Get a hint as to whether this process is hydrated. Note this is only a 
hint, since things could change.
+     */
+    public boolean hintIsHydrated() {
+        return _oprocess != null;
+    }
+
     /** Keep track of the time the process was last used. */
     private final void markused() {
         _lastUsed = System.currentTimeMillis();
@@ -600,5 +598,155 @@
     BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, 
PROCESS template,
             MyRoleMessageExchangeImpl instantiatingMessageExchange) {
         return new BpelRuntimeContextImpl(this, dao, template, 
instantiatingMessageExchange);
+
+    }
+
+    /**
+     * If necessary, create an object in the data store to represent the 
process. We'll re-use an existing object if it already
+     * exists and matches the GUID.
+     */
+    private void bounceProcessDAO(BpelDAOConnection conn, final QName pid, 
final long version, final OProcess oprocess) {
+        __log.debug("Creating process DAO for " + pid + " (guid=" + 
oprocess.guid + ")");
+        try {
+            boolean create = true;
+            ProcessDAO old = conn.getProcess(pid);
+            if (old != null) {
+                __log.debug("Found ProcessDAO for " + pid + " with GUID " + 
old.getGuid());
+                if (oprocess.guid == null) {
+                    // No guid, old version assume its good
+                    create = false;
+                } else {
+                    if (old.getGuid().equals(oprocess.guid)) {
+                        // Guids match, no need to create
+                        create = false;
+                    } else {
+                        // GUIDS dont match, delete and create new
+                        String errmsg = "ProcessDAO GUID " + old.getGuid() + " 
does not match " + oprocess.guid + "; replacing.";
+                        __log.debug(errmsg);
+                        old.delete();
+                    }
+                }
+            }
+
+            if (create) {
+                ProcessDAO newDao = conn.createProcess(pid, 
oprocess.getQName(), oprocess.guid, (int) version);
+                for (String correlator : oprocess.getCorrelators()) {
+                    newDao.addCorrelator(correlator);
+                }
+            }
+        } catch (BpelEngineException ex) {
+            throw ex;
+        } catch (Exception dce) {
+            __log.error("DbError", dce);
+            throw new BpelEngineException("DbError", dce);
+        }
+    }
+
+    private class HydrationLatch extends NStateLatch {
+
+        HydrationLatch() {
+            super(new Runnable[2]);
+            _transitions[0] = new Runnable() {
+                public void run() {
+                    doDehydrate();
+                }
+            };
+
+            _transitions[1] = new Runnable() {
+                public void run() {
+                    doHydrate();
+                }
+            };
+
+        }
+
+        private void doDehydrate() {
+            _oprocess = null;
+            _partnerRoles = null;
+            _myRoles = null;
+            _endpointToMyRoleMap = null;
+            _replacementMap = null;
+            _expLangRuntimeRegistry = null;
+        }
+
+        private void doHydrate() {
+            markused();
+            __log.debug("Rehydrating process " + _pconf.getProcessId());
+            try {
+                _oprocess = 
deserializeCompiledProcess(_pconf.getCBPInputStream());
+            } catch (Exception e) {
+                String errmsg = "Error reloading compiled process " + _pid + 
"; the file appears to be corrupted.";
+                __log.error(errmsg);
+                throw new BpelEngineException(errmsg, e);
+            }
+
+            _replacementMap = new ReplacementMapImpl(_oprocess);
+
+            // Create an expression language registry for this process
+            ExpressionLanguageRuntimeRegistry elangRegistry = new 
ExpressionLanguageRuntimeRegistry();
+            for (OExpressionLanguage elang : _oprocess.expressionLanguages) {
+                try {
+                    elangRegistry.registerRuntime(elang);
+                } catch (ConfigurationException e) {
+                    String msg = 
__msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, 
elang.properties);
+                    __log.error(msg, e);
+                    throw new BpelEngineException(msg, e);
+                }
+            }
+            _expLangRuntimeRegistry = elangRegistry;
+
+            setRoles(_oprocess);
+
+            if (!_hydratedOnce) {
+                for (PartnerLinkPartnerRoleImpl prole : 
_partnerRoles.values()) {
+                    PartnerRoleChannel channel = 
_engine._contexts.bindingContext.createPartnerRoleChannel(_pid,
+                            prole._plinkDef.partnerRolePortType, 
prole._initialPartner);
+                    prole._channel = channel;
+                    _partnerChannels.put(prole._initialPartner, 
prole._channel);
+                    EndpointReference epr = 
channel.getInitialEndpointReference();
+                    if (epr != null) {
+                        prole._initialEPR = epr;
+                        _partnerEprs.put(prole._initialPartner, epr);
+                    }
+                    __log.debug("Activated " + _pid + " partnerrole " + 
prole.getPartnerLinkName() + ": EPR is "
+                            + prole._initialEPR);
+                }
+                _hydratedOnce = true;
+            }
+
+            for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) {
+                myrole._initialEPR = _myEprs.get(myrole._endpoint);
+            }
+
+            for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
+                prole._channel = _partnerChannels.get(prole._initialPartner);
+                if (_partnerEprs.get(prole._initialPartner) != null) {
+                    prole._initialEPR = 
_partnerEprs.get(prole._initialPartner);
+                }
+            }
+
+
+            if (isInMemory()) {
+                bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), 
_pid, _pconf.getVersion(), _oprocess);
+            } else if (_engine._contexts.scheduler.isTransacted()) {
+                // If we have a transaction, we do this in the current 
transaction.
+                bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, 
_pconf.getVersion(), _oprocess);
+            } else {
+                // If we do not have a transaction we need to create one. 
+                try {
+                    _engine._contexts.scheduler.execIsolatedTransaction(new 
Callable<Object>() {
+                        public Object call() throws Exception {
+                            
bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, 
_pconf.getVersion(), _oprocess);
+                            return null;
+                        }
+                    });
+                } catch (Exception ex) {
+                    String errmsg = "DbError";
+                    __log.error(errmsg, ex);
+                    throw new BpelEngineException(errmsg, ex);
+                }
+            }
+        }
+
     }
 }


Reply via email to