Author: mszefler Date: Tue Jul 3 14:00:06 2007 New Revision: 552978 URL: http://svn.apache.org/viewvc?view=rev&rev=552978 Log: BART: BpelEngineImpl refactor/eleimination
Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original) +++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Jul 3 14:00:06 2007 @@ -40,7 +40,12 @@ public MessageExchangeContextImpl(ODEServer server) { } - public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException { + + public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException { + throw new UnsupportedOperationException(); + } + + public void invokePartnerBlocking(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException { if (__log.isDebugEnabled()) __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName()); @@ -48,14 +53,29 @@ if (__log.isDebugEnabled()) __log.debug("The service to invoke is the external service " + service); service.invoke(partnerRoleMessageExchange); + + } + + public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException { + throw new UnsupportedOperationException(); } + public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException { + throw new UnsupportedOperationException(); + } + + + public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException { if (__log.isDebugEnabled()) __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName()); // Nothing to do, no callback is necessary, the client just synchornizes itself with the // mex reply when invoking the engine. + } + + public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException { + // We don't support this yet, so not much to do here. } } Modified: incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original) +++ incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Tue Jul 3 14:00:06 2007 @@ -260,4 +260,4 @@ void setFailureType(String failureType); -} + } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -23,7 +23,7 @@ ResponseFuture _future; - public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public AsyncMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) { super(engine, mexId); } @@ -33,7 +33,7 @@ _future = new ResponseFuture(); - BpelProcess target = _engine.route(_callee, _request); + BpelProcess target = _server.route(_callee, _request); if (target == null) { if (__log.isWarnEnabled()) __log.warn(__msgs.msgUnknownEPR("" + _epr)); Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -16,7 +16,7 @@ Future<Status> _future; boolean _done = false; - public BlockingMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public BlockingMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) { super(engine, mexId); } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Jul 3 14:00:06 2007 @@ -8,7 +8,7 @@ * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,10 +65,6 @@ class BpelEngineImpl { private static final Log __log = LogFactory.getLog(BpelEngineImpl.class); - /** RNG, for delays */ - private Random _random = new Random(System.currentTimeMillis()); - - private static double _delayMean = 0; private static final Messages __msgs = MessageBundle.getMessages(Messages.class); @@ -81,18 +77,7 @@ _contexts = contexts; } - OProcess getOProcess(QName processId) { - BpelProcess process = _activeProcesses.get(processId); - - if (process == null) - return null; - - return process.getOProcess(); - } - public void processJob(WorkEvent we) throws BpelEngineException { - } - } private boolean checkRetry(final JobInfo jobInfo, Throwable t) { // TODO, better handling of failed jobs (put them in the DB perhaps?) @@ -138,39 +123,6 @@ // No more retries. return false; - } - - /** - * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially - * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable. - */ - private void debuggingDelay() { - // Do a delay for debugging purposes. - if (_delayMean != 0) - try { - long delay = randomExp(_delayMean); - // distribution - // with mean - // _delayMean - __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms."); - Thread.sleep(delay); - } catch (InterruptedException e) { - ; // ignore - } - } - - private long randomExp(double mean) { - double u = _random.nextDouble(); // Uniform - long delay = (long) (-Math.log(u) * mean); // Exponential - return delay; - } - - void fireEvent(BpelEvent event) { - // Note that the eventListeners list is a copy-on-write array, so need - // to mess with synchronization. - for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) { - l.onEvent(event); - } } /** Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul 3 14:00:06 2007 @@ -19,11 +19,14 @@ package org.apache.ode.bpel.engine; import java.io.InputStream; +import java.sql.Date; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -44,7 +47,6 @@ import org.apache.ode.bpel.explang.ConfigurationException; import org.apache.ode.bpel.explang.EvaluationException; import org.apache.ode.bpel.iapi.BpelEngineException; -import org.apache.ode.bpel.iapi.ContextException; import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.EndpointReference; import org.apache.ode.bpel.iapi.InvocationStyle; @@ -53,8 +55,10 @@ import org.apache.ode.bpel.iapi.ProcessConf; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; +import org.apache.ode.bpel.iapi.MessageExchange.Status; import org.apache.ode.bpel.intercept.InterceptorInvoker; import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; +import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl; import org.apache.ode.bpel.o.OElementVarType; import org.apache.ode.bpel.o.OExpressionLanguage; import org.apache.ode.bpel.o.OMessageVarType; @@ -68,6 +72,7 @@ import org.apache.ode.jacob.soup.ReplacementMap; import org.apache.ode.utils.ObjectPrinter; import org.apache.ode.utils.msg.MessageBundle; +import org.omg.CosNaming.IstringHelper; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; @@ -76,7 +81,7 @@ /** * Entry point into the runtime of a BPEL process. * - * @author Maciej Szefler + * @author Maciej Szefler * @author Matthieu Riou <mriou at apache dot org> */ public class BpelProcess { @@ -108,8 +113,6 @@ /** Last time the process was used. */ private volatile long _lastUsed; - BpelEngineImpl _engine; - DebuggerSupport _debugger; ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry; @@ -125,14 +128,33 @@ private HydrationLatch _hydrationLatch; private Contexts _contexts; - + /** Manage instance-level locks. */ private final InstanceLockManager _instanceLockManager = new InstanceLockManager(); + private final Set<InvocationStyle> _invocationStyles; + + private BpelDAOConnectionFactoryImpl _inMemDao; + + private Random _random = new Random(); + + private BpelServerImpl _server; + public BpelProcess(ProcessConf conf, BpelEventListener debugger) { _pid = conf.getProcessId(); _pconf = conf; _hydrationLatch = new HydrationLatch(); + _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.scheduler); + + // TODO : do this on a per-partnerlink basis, support transacted styles. + HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>(); + istyles.add(InvocationStyle.BLOCKING); + if (!conf.isTransient()) { + istyles.add(InvocationStyle.ASYNC); + istyles.add(InvocationStyle.RELIABLE); + } + + _invocationStyles = Collections.unmodifiableSet(istyles); } public String toString() { @@ -159,41 +181,37 @@ * * @param mex */ - void invokeProcess(MyRoleMessageExchangeImpl mex) { + void invokeProcess(MessageExchangeDAO mexdao) { _hydrationLatch.latch(1); try { - MessageExchangeDAO mexdao = getDAO(mex); - - PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName()); + PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee()); if (target == null) { - String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId()); + String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId()); __log.error(errmsg); - mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null); + mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString()); + mexdao.setFaultExplanation(errmsg); + mexdao.setStatus(Status.FAILURE.toString()); return; } - getDAO(mex).setProcess(getProcessDAO()); + mexdao.setProcess(getProcessDAO()); - if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { - __log.debug("Aborting processing of mex " + mex + " due to interceptors."); - return; - } + // TODO: fix this + // if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) { + // __log.debug("Aborting processing of mex " + mex + " due to interceptors."); + // return; + // } markused(); - target.invokeMyRole(mex); - markused(); + target.invokeMyRole(mexdao); } finally { _hydrationLatch.release(1); } - // For a one way, once the engine is done, the mex can be safely released. - if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { - mex.release(); - } - } - - private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) { - + // TODO: relocate this code // For a one way, once the engine is done, the mex can be safely released. + // if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { + // mex.release(); + // } } private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) { @@ -291,59 +309,40 @@ * @return <code>true</code> if execution should continue, <code>false</code> otherwise */ boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { - InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf); - - for (MessageExchangeInterceptor i : _mexInterceptors) - if (!mex.processInterceptor(i, mex, ictx, invoker)) - return false; - for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors()) - if (!mex.processInterceptor(i, mex, ictx, invoker)) - return false; - + // InterceptorContextImpl ictx = new InterceptorContextImpl(_contexts.dao.getConnection(), getProcessDAO(), _pconf); + // + // for (MessageExchangeInterceptor i : _mexInterceptors) + // if (!mex.processInterceptor(i, mex, ictx, invoker)) + // return false; + // for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors()) + // if (!mex.processInterceptor(i, mex, ictx, invoker)) + // return false; + // return true; } - /* - - // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle - // all types of failure here, the scheduler is not going to know how to handle our errors, - // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come - // to a grinding halt. - try { - - ProcessInstanceDAO instance; - if (process.isInMemory()) - instance = _contexts.inMemDao.getConnection().getInstance(we.getIID()); - else - instance = _contexts.dao.getConnection().getInstance(we.getIID()); - - if (instance == null) { - __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); - // nothing we can do, this instance is not in the database, it will - // always - // fail. - return; - } - ProcessDAO processDao = instance.getProcess(); - process = _activeProcesses.get(processDao.getProcessId()); - - process.handleWorkEvent(we.getDetail()); - debuggingDelay(); - } catch (BpelEngineException bee) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); - throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee)); - } catch (ContextException ce) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce); - throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); - } catch (RuntimeException rte) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); - throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte)); - } catch (Throwable t) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); - throw new Scheduler.JobProcessorException(false); - - + * // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the + * scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest + * the whole system come // to a grinding halt. try { + * + * ProcessInstanceDAO instance; if (process.isInMemory()) instance = + * _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance = + * _contexts.dao.getConnection().getInstance(we.getIID()); + * + * if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this + * instance is not in the database, it will // always // fail. return; } ProcessDAO processDao = instance.getProcess(); process = + * _activeProcesses.get(processDao.getProcessId()); + * + * process.handleWorkEvent(we.getDetail()); debuggingDelay(); } catch (BpelEngineException bee) { + * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee, + * checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce); + * throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) { + * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte, + * checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new + * Scheduler.JobProcessorException(false); + * + * */ /** * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>) @@ -364,9 +363,8 @@ if (__log.isDebugEnabled()) { __log.debug("InvokeInternal event for mexid " + we.getMexId()); } - ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we - .getMexId()); - invokeProcess(mex); + MessageExchangeDAO mexdao = loadMexDao(we.getMexId()); + invokeProcess(mexdao); } else { // Instance level events // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks, @@ -390,7 +388,7 @@ __log.debug("Instance " + we.getIID() + " is busy, rescheduling job."); // TODO: This should really be more of something like the exponential backoff algorithm in ethernet. _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis() - + Math.min(randomExp(1000), 10000))); + + _random.nextInt(1000))); return; } @@ -435,6 +433,11 @@ } } + private MessageExchangeDAO loadMexDao(String mexId) { + return isInMemory() ? _inMemDao.getConnection().getMessageExchange(mexId) : _contexts.dao.getConnection() + .getMessageExchange(mexId); + } + private void setRoles(OProcess oprocess) { _partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>(); _myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>(); @@ -485,8 +488,7 @@ } ProcessDAO getProcessDAO() { - return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : getEngine()._contexts.dao - .getConnection().getProcess(_pid); + return isInMemory() ? _inMemDao.getConnection().getProcess(_pid) : _contexts.dao.getConnection().getProcess(_pid); } static String genCorrelatorId(OPartnerLink plink, String opName) { @@ -539,7 +541,7 @@ void deactivate() { // Deactivate all the my-role endpoints. for (Endpoint endpoint : _myEprs.keySet()) - _engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint); + _contexts.bindingContext.deactivateMyRoleEndpoint(endpoint); // TODO Deactivate all the partner-role channels } @@ -653,10 +655,10 @@ MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) { InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle()); - + _hydrationLatch.latch(1); try { - MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId()); + MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId()); OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId()); PortType ptype = plink.myRolePortType; Operation op = plink.getMyRoleOperation(mexdao.getOperation()); @@ -667,7 +669,7 @@ } } - PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) { + PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) { InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle()); PartnerRoleMessageExchangeImpl mex; _hydrationLatch.latch(1); @@ -677,24 +679,27 @@ Operation op = plink.getPartnerRoleOperation(mexdao.getOperation()); switch (istyle) { case BLOCKING: - mex = new BlockingPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */ - plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); + mex = new BlockingPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), + null, /* EPR todo */ + plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); break; case ASYNC: - mex = new AsyncPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), + mex = new AsyncPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); break; case TRANSACTED: - mex = new TransactedPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */ - plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); + mex = new TransactedPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), + null, /* EPR todo */ + plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); break; case RELIABLE: - mex = new ReliablePartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */ - plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); + mex = new ReliablePartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), + null, /* EPR todo */ + plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink)); break; - + default: throw new BpelEngineException("Unexpected InvocationStyle: " + istyle); @@ -706,6 +711,10 @@ } + Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) { + return _invocationStyles; + } + private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() { _hydrationLatch.latch(1); try { @@ -724,10 +733,6 @@ } } - BpelEngineImpl getEngine() { - return _engine; - } - public boolean isInMemory() { return _pconf.isTransient(); } @@ -853,7 +858,7 @@ if (!_hydratedOnce) { for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { - PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid, + PartnerRoleChannel channel = _contexts.bindingContext.createPartnerRoleChannel(_pid, prole._plinkDef.partnerRolePortType, prole._initialPartner); prole._channel = channel; _partnerChannels.put(prole._initialPartner, prole._channel); @@ -880,16 +885,16 @@ } if (isInMemory()) { - bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess); - } else if (_engine._contexts.scheduler.isTransacted()) { + bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + } else if (_contexts.scheduler.isTransacted()) { // If we have a transaction, we do this in the current transaction. - bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); } else { // If we do not have a transaction we need to create one. try { - _engine._contexts.scheduler.execIsolatedTransaction(new Callable<Object>() { + _contexts.scheduler.execIsolatedTransaction(new Callable<Object>() { public Object call() throws Exception { - bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); + bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess); return null; } }); @@ -902,4 +907,13 @@ } } + + MessageExchangeDAO createMessageExchange(final char dir) { + if (isInMemory()) { + return _inMemDao.getConnection().createMessageExchange(dir); + } else { + return _contexts.dao.getConnection().createMessageExchange(dir); + } + } + } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jul 3 14:00:06 2007 @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Random; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.Callable; @@ -93,24 +94,28 @@ /** Maximum age of a process before it is quiesced */ private static Long __processMaxAge; + /** RNG, for delays */ + private Random _random = new Random(System.currentTimeMillis()); + + private static double _delayMean = 0; + /** * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by * _mngmtLock.writeLock(). */ private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>(); - /** Mapping from myrole endpoint name to active process. */ - private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>(); + /** Mapping from myrole service name to active process. */ + private final HashMap<QName, BpelProcess> _serviceMap = new HashMap<QName, BpelProcess>(); private State _state = State.SHUTDOWN; - private Contexts _contexts = new Contexts(); + Contexts _contexts = new Contexts(); private DehydrationPolicy _dehydrationPolicy; private Properties _configProperties; - BpelDatabase _db; /** @@ -273,9 +278,9 @@ for (Endpoint e : process.getServiceNames()) { __log.debug("Register process: serviceId=" + e + ", process=" + process); - _serviceMap.put(e, process); + _serviceMap.put(e.serviceName, process); } - + process.activate(_contexts); _registeredProcesses.put(process.getPID(), process); @@ -350,15 +355,12 @@ // TODO: use the message to route to the correct service if more than // one service is listening on the same endpoint. - BpelProcess routed = null; - for (Endpoint endpoint : _serviceMap.keySet()) { - if (endpoint.serviceName.equals(service)) - routed = _serviceMap.get(endpoint); - } - if (__log.isDebugEnabled()) - __log.debug("Routed: svcQname " + service + " --> " + routed); - return routed; - + _mngmtLock.readLock().lock(); + try { + return _serviceMap.get(service); + } finally { + _mngmtLock.readLock().unlock(); + } } /** @@ -446,10 +448,6 @@ _contexts.dao = daoCF; } - public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) { - _contexts.inMemDao = daoCF; - } - public void setBindingContext(BindingContext bc) { _contexts.bindingContext = bc; } @@ -467,13 +465,7 @@ Callable<String> createDao = new Callable<String>() { public String call() throws Exception { - MessageExchangeDAO dao; - if (target.isInMemory()) { - dao = _contexts.inMemDao.getConnection().createMessageExchange( - MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); - } else { - dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); - } + MessageExchangeDAO dao = target.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE); dao.setInvocationStyle(istyle.toString()); dao.setCorrelationId(clientKey); dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString()); @@ -554,7 +546,7 @@ return null; ProcessDAO pdao = mexdao.getProcess(); - BpelProcess process = pdao == null ? null : _engine._activeProcesses.get(pdao.getProcessId()); + BpelProcess process = pdao == null ? null : _registeredProcesses.get(pdao.getProcessId()); if (process == null) { String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId()); @@ -607,23 +599,73 @@ _mngmtLock.readLock().lock(); try { - + BpelProcess process = _serviceMap.get(serviceId); + if (process == null) + throw new BpelEngineException("No such service: " + serviceId); + return process.getSupportedInvocationStyle(serviceId); } finally { _mngmtLock.readLock().unlock(); } } - void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) { WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener); } + OProcess getOProcess(QName processId) { + _mngmtLock.readLock().lock(); + try { + BpelProcess process = _registeredProcesses.get(processId); + + if (process == null) + return null; + + return process.getOProcess(); + + } finally { + _mngmtLock.readLock().unlock(); + } + } + protected void assertTransaction() { if (!_contexts.scheduler.isTransacted()) throw new BpelEngineException("Operation must be performed in a transaction!"); } + void fireEvent(BpelEvent event) { + // Note that the eventListeners list is a copy-on-write array, so need + // to mess with synchronization. + for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) { + l.onEvent(event); + } + } + + /** + * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially + * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable. + */ + private void debuggingDelay() { + // Do a delay for debugging purposes. + if (_delayMean != 0) + try { + long delay = randomExp(_delayMean); + // distribution + // with mean + // _delayMean + __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms."); + Thread.sleep(delay); + } catch (InterruptedException e) { + ; // ignore + } + } + + private long randomExp(double mean) { + double u = _random.nextDouble(); // Uniform + long delay = (long) (-Math.log(u) * mean); // Exponential + return delay; + } + private class ProcessDefReaper implements Runnable { public void run() { __log.debug("Starting process definition reaper thread."); @@ -661,6 +703,4 @@ } } - - } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Jul 3 14:00:06 2007 @@ -45,7 +45,6 @@ BindingContext bindingContext; BpelDAOConnectionFactory dao; - BpelDAOConnectionFactory inMemDao; /** Global Message-Exchange interceptors. Must be copy-on-write!!! */ final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>(); Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -93,7 +93,7 @@ Contexts _contexts; - BpelEngineImpl _engine; + BpelServerImpl _server; boolean _associated; @@ -121,9 +121,9 @@ private Set<String> _propNames; - public MessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public MessageExchangeImpl(BpelServerImpl engine, String mexId) { _contexts = engine._contexts; - _engine = engine; + _server = engine; _mexId = mexId; } Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -28,7 +28,7 @@ protected QName _callee; - public MyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public MyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) { super(engine, mexId); } @@ -116,8 +116,8 @@ doInTX(new InDbAction<Void>() { public Void call(MessageExchangeDAO mexdao) { - _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null); - _engine._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null); + _server._contexts.scheduler.schedulePersistedJob(we.getDetail(), null); + _server._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null); return null; } @@ -134,9 +134,9 @@ * @return <code>true</code> if execution should continue, <code>false</code> otherwise */ protected boolean processInterceptors(InterceptorInvoker invoker, MessageExchangeDAO mexDao) { - InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), mexDao.getProcess(), null); + InterceptorContextImpl ictx = new InterceptorContextImpl(_server._contexts.dao.getConnection(), mexDao.getProcess(), null); - for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors()) + for (MessageExchangeInterceptor i : _server.getGlobalInterceptors()) if (!processInterceptor(i, this, ictx, invoker)) return false; Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -57,7 +57,7 @@ public static final int TIMEOUT = 2 * 60 * 1000; - public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public ReliableMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) { super(engine, mexId); // RELIABLE means we are bound to a transaction @@ -76,7 +76,7 @@ if (_status != Status.NEW) throw new BpelEngineException("Invalid state: " + _status); - final BpelProcess target = _engine.route(_callee, _request); + final BpelProcess target = _server.route(_callee, _request); if (target == null) { if (__log.isWarnEnabled()) __log.warn(__msgs.msgUnknownEPR("" + _epr)); Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original) +++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jul 3 14:00:06 2007 @@ -9,7 +9,7 @@ */ public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl { - public TransactedMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) { + public TransactedMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) { super(engine, mexId); } Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=552978&r1=552977&r2=552978 ============================================================================== --- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original) +++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Tue Jul 3 14:00:06 2007 @@ -1,4 +1,4 @@ -/* +at/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information