Author: mszefler Date: Wed Jun 20 10:40:23 2007 New Revision: 549166 URL: http://svn.apache.org/viewvc?view=rev&rev=549166 Log: Merge issue with hydration latch.
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=549166&r1=549165&r2=549166 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jun 20 10:40:23 2007 @@ -543,6 +543,17 @@ } + void hydrate() { + _hydrationLatch.latch(1); + + try { + // We don't actually need to do anything, the latch will run the doHydrate method + // when necessary.. + } finally { + _hydrationLatch.release(1); + } + } + OProcess getOProcess() { _hydrationLatch.latch(1); try { Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=549166&r1=549165&r2=549166 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Jun 20 10:40:23 2007 @@ -18,6 +18,15 @@ */ package org.apache.ode.bpel.engine; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.xml.namespace.QName; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ode.bpel.dao.BpelDAOConnection; @@ -39,14 +48,8 @@ import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; import org.apache.ode.bpel.o.OProcess; import org.apache.ode.utils.msg.MessageBundle; - -import javax.xml.namespace.QName; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ode.utils.stl.CollectionsX; +import org.apache.ode.utils.stl.MemberOfFunction; /** * <p> @@ -65,7 +68,7 @@ * @author Maciej Szefler <mszefler at gmail dot com> * @author Matthieu Riou <mriou at apache dot org> */ -public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor, ProcessLifecycleCallback { +public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor { private static final Log __log = LogFactory.getLog(BpelServerImpl.class); private static final Messages __msgs = MessageBundle.getMessages(Messages.class); @@ -73,7 +76,11 @@ /** Maximum age of a process before it is quiesced */ private static Long __processMaxAge; - private final Set<BpelProcess> _runningProcesses = new HashSet<BpelProcess>(); + /** + * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. + * Guarded by _mngmtLock.writeLock(). + */ + private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>(); private State _state = State.SHUTDOWN; private Contexts _contexts = new Contexts(); @@ -219,7 +226,6 @@ if (!registered) _mngmtLock.readLock().unlock(); } - assert registered; return _engine; } @@ -247,13 +253,10 @@ __log.debug("Registering process " + conf.getProcessId() + " with server."); - BpelProcess process = new BpelProcess(conf, null, this); + BpelProcess process = new BpelProcess(conf, null); _engine.registerProcess(process); - - synchronized(_runningProcesses) { - _runningProcesses.add(process); - } + _registeredProcesses.add(process); process.hydrate(); __log.info(__msgs.msgProcessRegistered(conf.getProcessId())); @@ -277,9 +280,7 @@ BpelProcess p = null; if (_engine != null) { _engine.unregisterProcess(pid); - synchronized(_runningProcesses) { - _runningProcesses.remove(p); - } + _registeredProcesses.remove(p); } __log.info(__msgs.msgProcessUnregistered(pid)); @@ -293,50 +294,6 @@ } /** - * If necessary, create an object in the data store to represent the - * process. We'll re-use an existing object if it already exists and matches - * the GUID. - */ - private void bounceProcessDAO(BpelDAOConnection conn, final QName pid, - final long version, final OProcess oprocess) { - __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")"); - try { - boolean create = true; - ProcessDAO old = conn.getProcess(pid); - if (old != null) { - __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid()); - if (oprocess.guid == null) { - // No guid, old version assume its good - create = false; - } else { - if (old.getGuid().equals(oprocess.guid)) { - // Guids match, no need to create - create = false; - } else { - // GUIDS dont match, delete and create new - String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " - + oprocess.guid + "; replacing."; - __log.debug(errmsg); - old.delete(); - } - } - } - - if (create) { - ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, (int)version); - for (String correlator : oprocess.getCorrelators()) { - newDao.addCorrelator(correlator); - } - } - } catch (BpelEngineException ex) { - throw ex; - } catch (Exception dce) { - __log.error("DbError", dce); - throw new BpelEngineException("DbError", dce); - } - } - - /** * Register a global message exchange interceptor. * @param interceptor message-exchange interceptor */ @@ -389,34 +346,7 @@ public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException { getEngine().onScheduledJob(jobInfo); } - - public void hydrated(final BpelProcess process) { - // Recreating the process DAO if the definition has changed, shouldn't do anything - // except after a redeploy - if (_contexts.scheduler.isTransacted() || process.isInMemory()) - bounceProcessDAO(process.isInMemory() ? _contexts.inMemDao.getConnection() : _contexts.dao.getConnection(), - process.getPID(), process._pconf.getVersion(), process.getOProcess()); - else { - try { - _db.exec(new BpelDatabase.Callable<Object>() { - public Object run(BpelDAOConnection conn) throws Exception { - bounceProcessDAO(conn, process.getPID(), - process._pconf.getVersion(), process.getOProcess()); - return null; - } - }); - } catch (Exception ex) { - String errmsg = "DbError"; - __log.error(errmsg, ex); - throw new BpelEngineException(errmsg, ex); - } - } - - synchronized (_runningProcesses) { - _runningProcesses.add(process); - } - } - + private class ProcessDefReaper implements Runnable { public void run() { __log.debug("Starting process definition reaper thread."); @@ -429,19 +359,20 @@ __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount); // Copying the runnning process list to avoid synchronization // problems and a potential mess if a policy modifies the list - List<BpelProcess> runningProcesses; - synchronized(_runningProcesses) { - runningProcesses = new ArrayList<BpelProcess>(_runningProcesses); - } + List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses); + CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() { + public boolean isMember(BpelProcess o) { + return !o.hintIsHydrated(); + } + + }); + // And the happy winners are... - List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(runningProcesses); + List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates); // Bye bye for (BpelProcess process : ripped) { __log.debug("Dehydrating process " + process.getPID()); process.dehydrate(); - synchronized (_runningProcesses) { - _runningProcesses.remove(process); - } } } finally { _mngmtLock.writeLock().unlock();