Author: mszefler Date: Wed Jan 10 09:28:20 2007 New Revision: 494901 URL: http://svn.apache.org/viewvc?view=rev&rev=494901 Log: Messages where the correlation keys were not found in the correct place were causing rollbacks. Fixed to cause message exchange failure.
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=494901&r1=494900&r2=494901 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Jan 10 09:28:20 2007 @@ -98,7 +98,8 @@ /** Last time the process was used. */ volatile long _lastUsed; - public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger, ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) { + public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger, + ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) { _pid = conf.getProcessId(); _pconf = conf; @@ -111,8 +112,8 @@ for (Map.Entry<String, Endpoint> provide : conf.getProvideEndpoints().entrySet()) { OPartnerLink plink = oprocess.getPartnerLink(provide.getKey()); if (plink == null) { - String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link " - + provide.getKey(); + String errmsg = "Error in deployment descriptor for process " + _pid + + "; reference to unknown partner link " + provide.getKey(); __log.error(errmsg); throw new BpelEngineException(errmsg); } @@ -124,13 +125,13 @@ for (Map.Entry<String, Endpoint> invoke : conf.getInvokeEndpoints().entrySet()) { OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey()); if (plink == null) { - String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link " - + invoke.getKey(); + String errmsg = "Error in deployment descriptor for process " + _pid + + "; reference to unknown partner link " + invoke.getKey(); __log.error(errmsg); throw new BpelEngineException(errmsg); } - __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> " - + invoke.getValue()); + __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + + " --> " + invoke.getValue()); partnerRoleIntialValues.put(plink, invoke.getValue()); } @@ -146,7 +147,8 @@ } if (pl.hasPartnerRole()) { - PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints().get(pl.getName())); + PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints() + .get(pl.getName())); _partnerRoles.put(pl, partnerRole); } } @@ -156,7 +158,8 @@ return "BpelProcess[" + _pid + "]"; } - public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) { + public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, + FaultData fault) { if (__log.isDebugEnabled()) __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action); @@ -216,7 +219,8 @@ if (target != null) { mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName())); } else { - __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex."); + __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + + " when initializing a myRole mex."); } } @@ -271,15 +275,18 @@ } /** - * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned. - * If the part is an XML schema typed part, then the name of the part is returned in the null namespace. + * Get the element name for a given WSDL part. If the part is an + * <em>element</em> part, the name of that element is returned. If the + * part is an XML schema typed part, then the name of the part is returned + * in the null namespace. * * @param part * WSDL [EMAIL PROTECTED] javax.wsdl.Part} * @return name of element containing said part */ static QName getElementNameForPart(OMessageVarType.Part part) { - return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name); + return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, + part.name); } /** Create a version-appropriate runtime context. */ @@ -293,10 +300,12 @@ * * @param mex * message exchange - * @return <code>true</code> if execution should continue, <code>false</code> otherwise + * @return <code>true</code> if execution should continue, + * <code>false</code> otherwise */ private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) { - InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(),_pconf); + InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), + getProcessDAO(), _pconf); for (MessageExchangeInterceptor i : _mexInterceptors) if (!mex.processInterceptor(i, mex, ictx, invoker)) @@ -310,7 +319,8 @@ } /** - * Replacement object for serializtation of the [EMAIL PROTECTED] OBase} (compiled BPEL) objects in the JACOB VPU. + * Replacement object for serializtation of the [EMAIL PROTECTED] OBase} (compiled + * BPEL) objects in the JACOB VPU. */ public static final class OBaseReplacementImpl implements Externalizable { private static final long serialVersionUID = 1L; @@ -348,8 +358,8 @@ } /** - * Get the initial value of this role's EPR. This value is obtained from the integration layer when the process is enabled - * on the server. + * Get the initial value of this role's EPR. This value is obtained from + * the integration layer when the process is enabled on the server. * * @return initial epr */ @@ -389,7 +399,8 @@ */ public void invokeMyRole(MyRoleMessageExchangeImpl mex) { if (__log.isTraceEnabled()) { - __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex })); + __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { + "messageExchange", mex })); } Operation operation = getMyRoleOperation(mex.getOperationName()); @@ -398,7 +409,7 @@ mex.setFailure(FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null); return; } - + mex.getDAO().setPartnerLinkModelId(_plinkDef.getId()); mex.setPortOp(_plinkDef.myRolePortType, operation); mex.setPattern(operation.getOutput() == null ? MessageExchangePattern.REQUEST_ONLY @@ -421,20 +432,26 @@ MessageRouteDAO messageRoute = null; // We need to compute the correlation keys (based on the operation - // we can - // infer which correlation keys to compute - this is merely a set + // we can infer which correlation keys to compute - this is merely a set // consisting of each correlationKey used in each correlation sets - // that is - // ever - // referenced in an <receive>/<onMessage> on this + // that is ever referenced in an <receive>/<onMessage> on this // partnerlink/operation. - keys = computeCorrelationKeys(mex); + try { + keys = computeCorrelationKeys(mex); + } catch (InvalidMessageException ime) { + // We'd like to do a graceful exit here, no sense in rolling back due to a + // a message format problem. + __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime); + mex.setFailure(FailureType.FORMAT_ERROR, ime.getMessage(), null); + return; + } String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID); String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID); if (__log.isDebugEnabled()) { - __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys) - + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId); + __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId + + " partnerSessionId=" + partnerSessionId); } CorrelationKey matchedKey = null; @@ -476,9 +493,8 @@ BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), mex); // send process instance event - NewProcessInstanceEvent evt = new NewProcessInstanceEvent( - new QName(_oprocess.targetNamespace, _oprocess.getName()), getProcessDAO().getProcessId(), newInstance - .getInstanceId()); + NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_oprocess.targetNamespace, + _oprocess.getName()), getProcessDAO().getProcessId(), newInstance.getInstanceId()); evt.setPortType(mex.getPortType().getQName()); evt.setOperation(operation.getName()); evt.setMexId(mex.getMessageExchangeId()); @@ -494,10 +510,6 @@ + messageRoute.getTargetInstance().getInstanceId()); } - // Attempt to acquire an instance-level lock. - // _lockManager.lock(messageRoute.getTargetInstance().getInstanceId(), - // 60, TimeUnit.SECONDS); - ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance(); // Reload process instance for DAO. @@ -505,14 +517,12 @@ instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex); // Kill the route so some new message does not get routed to - // same - // process - // instance. + // same process instance. correlator.removeRoutes(messageRoute.getGroupId(), instanceDao); // send process instance event - CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess.getName()), - getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey); + CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess + .getName()), getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey); evt.setPortType(mex.getPortType().getQName()); evt.setOperation(operation.getName()); evt.setMexId(mex.getMessageExchangeId()); @@ -537,8 +547,8 @@ } else { // send event - CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex.getOperation() - .getName(), mex.getMessageExchangeId(), keys); + CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex + .getOperation().getName(), mex.getMessageExchangeId(), keys); evt.setProcessId(getProcessDAO().getProcessId()); evt.setProcessName(new QName(_oprocess.targetNamespace, _oprocess.getName())); @@ -552,8 +562,10 @@ } } - // Now we have to update our message exchange status. If the <reply> was not hit during the - // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way + // Now we have to update our message exchange status. If the <reply> + // was not hit during the + // invocation, then we will be in the "REQUEST" phase which means + // that either this was a one-way // or a two-way that needs to delivery the reply asynchronously. if (mex.getStatus() == Status.REQUEST) { mex.setStatus(Status.ASYNC); @@ -576,7 +588,8 @@ Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation); for (OScope.CorrelationSet cset : csets) { - CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()), msg); + CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()), + msg); keys.add(key); } @@ -588,7 +601,8 @@ return keys.toArray(new CorrelationKey[keys.size()]); } - private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, Element msg) { + private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, + Element msg) { String[] values = new String[cset.properties.size()]; int jIdx = 0; @@ -599,8 +613,8 @@ if (alias == null) { // TODO: Throw a real exception! And catch this at compile // time. - throw new IllegalArgumentException("No alias matching property '" + property.name + "' with message type '" - + messagetype + "'"); + throw new IllegalArgumentException("No alias matching property '" + property.name + + "' with message type '" + messagetype + "'"); } String value; @@ -635,7 +649,8 @@ __log.debug("Processing partner's response for partnerLink: " + messageExchange); } - BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null, null); + BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null, + null); processInstance.invocationResponse(messageExchange); processInstance.execute(); } @@ -697,8 +712,8 @@ } ProcessDAO getProcessDAO() { - return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : _engine._contexts.dao - .getConnection().getProcess(_pid); + return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) + : _engine._contexts.dao.getConnection().getProcess(_pid); } static String genCorrelatorId(OPartnerLink plink, String opName) { @@ -724,7 +739,8 @@ myrole._initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, myrole._endpoint, myrole._plinkDef.myRolePortType); - __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is " + myrole._initialEPR); + __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is " + + myrole._initialEPR); } for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) { @@ -736,12 +752,13 @@ prole._initialEPR = epr; } - __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + prole._initialEPR); + __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + + prole._initialEPR); } __log.debug("Activated " + _pid); - + markused(); } @@ -805,8 +822,8 @@ public long getLastUsed() { return _lastUsed; } - - /** Keep track of the time the process was last used. */ + + /** Keep track of the time the process was last used. */ private final void markused() { _lastUsed = System.currentTimeMillis(); }