Author: mszefler
Date: Tue Sep 19 17:42:16 2006
New Revision: 448019
URL: http://svn.apache.org/viewvc?view=rev&rev=448019
Log:
Further tweaks on the correlation mechanism. Corrolator Message/Selector entry
has been turned into two tables due to Hibernate bugs in
query generation for tables with discriminators. Eliminated UNIQ constraint
mechanism for correlation and replaced with mandatory tx
boundary at <receive>/<pick> activity (even if the message has already
arrived). This mechanism seems to work a lot better, has no
rollbacks and no lost messages.
Added:
incubator/ode/trunk/axis2-examples/.classpath (with props)
incubator/ode/trunk/axis2-examples/.project (with props)
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
(with props)
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java
(with props)
Removed:
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelationKeySerializer.java
Modified:
incubator/ode/trunk/axis2-war/src/main/webapp/WEB-INF/classes/log4j.properties
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/common/CorrelationKey.java
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/SessionManager.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelationSetDaoImpl.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelator.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorEntry.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
Added: incubator/ode/trunk/axis2-examples/.classpath
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2-examples/.classpath?view=auto&rev=448019
==============================================================================
--- incubator/ode/trunk/axis2-examples/.classpath (added)
+++ incubator/ode/trunk/axis2-examples/.classpath Tue Sep 19 17:42:16 2006
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path=""/>
+ <classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="output" path=""/>
+</classpath>
Propchange: incubator/ode/trunk/axis2-examples/.classpath
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/ode/trunk/axis2-examples/.project
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2-examples/.project?view=auto&rev=448019
==============================================================================
--- incubator/ode/trunk/axis2-examples/.project (added)
+++ incubator/ode/trunk/axis2-examples/.project Tue Sep 19 17:42:16 2006
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>axis2-examples</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
Propchange: incubator/ode/trunk/axis2-examples/.project
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/ode/trunk/axis2-war/src/main/webapp/WEB-INF/classes/log4j.properties
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2-war/src/main/webapp/WEB-INF/classes/log4j.properties?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/axis2-war/src/main/webapp/WEB-INF/classes/log4j.properties
(original)
+++
incubator/ode/trunk/axis2-war/src/main/webapp/WEB-INF/classes/log4j.properties
Tue Sep 19 17:42:16 2006
@@ -7,12 +7,10 @@
log4j.category.org.objectweb=ERROR
log4j.category.org.apache.ode.axis2=DEBUG
log4j.category.org.apache.ode.bpel.engine=DEBUG
+log4j.category.org.apache.ode.daohib.bpel.CorrelatorDaoImpl=DEBUG
log4j.category.org.apache.ode.bpel.epr=INFO
# Console appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.SimpleLayout
-
-# Uncomment the following if you want fancy
-#log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n
Modified:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
(original)
+++
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
Tue Sep 19 17:42:16 2006
@@ -46,19 +46,9 @@
if (__log.isDebugEnabled())
__log.debug("Invoking a partner operation: " +
partnerRoleMessageExchange.getOperationName());
-// EndpointReference epr =
partnerRoleMessageExchange.getEndpointReference();
-// // We only invoke with WSA endpoints, that makes our life easier
-// if (!(epr instanceof WSAEndpoint))
-// epr = EndpointFactory.convert(new
QName(Namespaces.WS_ADDRESSING_NS, "EndpointReference"),
-// epr.toXML().getDocumentElement());
-// // It's now safe to cast
-// QName serviceName = ((WSAEndpoint)epr).getServiceName();
-// String portName = ((WSAEndpoint)epr).getPortName();
-// if (__log.isDebugEnabled())
-// __log.debug("The service to invoke is the external service " +
serviceName);
-// ExternalService service = _server.getExternalService(serviceName,
portName);
-
- ExternalService service = (ExternalService)
partnerRoleMessageExchange.getChannel();
+ ExternalService service =
(ExternalService)partnerRoleMessageExchange.getChannel();
+ if (__log.isDebugEnabled())
+ __log.debug("The service to invoke is the external service " +
service);
service.invoke(partnerRoleMessageExchange);
}
Modified:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
(original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
Tue Sep 19 17:42:16 2006
@@ -307,7 +307,7 @@
try {
_connector.start();
} catch (Exception e) {
- __log.error("Failed to initialize JCA connector.",e);
+ __log.error("Failed to initialize JCA connector.");
}
}
}
Modified:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
(original)
+++
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
Tue Sep 19 17:42:16 2006
@@ -36,6 +36,7 @@
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
@@ -155,7 +156,8 @@
boolean timeout = false;
// Invocation response could be delayed, if so we have to wait
// for it.
- if (odeMex.getStatus() == MessageExchange.Status.ASYNC) {
+ if (odeMex.getMessageExchangePattern() ==
MessageExchangePattern.REQUEST_RESPONSE &&
+ odeMex.getStatus() == MessageExchange.Status.ASYNC) {
odeMex = callback.getResponse(TIMEOUT);
if (odeMex == null)
timeout = true;
Modified:
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/common/CorrelationKey.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/common/CorrelationKey.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/common/CorrelationKey.java
(original)
+++
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/common/CorrelationKey.java
Tue Sep 19 17:42:16 2006
@@ -25,9 +25,9 @@
import org.apache.ode.utils.ArrayUtils;
-
/**
- * <p>Message correlation key. Correlation keys are used to match up incoming
+ * <p>
+ * Message correlation key. Correlation keys are used to match up incoming
* messages with a particular process <em>instance</em>. The basic procedure
* is to generate and save a correlation key when a <code>receive</code> or
* <code>pick</em> activity is activated, and then to match incoming messages
@@ -44,133 +44,164 @@
* the <code><propertyAlias;></code> BPEL process document element).
* </p>
*/
-public class CorrelationKey implements Serializable{
+public class CorrelationKey implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- /** CorrelationSet identifier. */
- private int _csetId;
+ /** CorrelationSet identifier. */
+ private int _csetId;
- /** Key values. */
- private final String _keyValues[];
-
-
- /**
- * Constructor.
- * @param csetId correlation set identifier
- * @param keyValues correlation key values
- */
- public CorrelationKey(int csetId, String[] keyValues) {
- _csetId = csetId;
- _keyValues = keyValues;
- }
-
- public CorrelationKey(String canonicalForm) {
- int firstTilde = canonicalForm.indexOf('~');
- _csetId = Integer.parseInt(canonicalForm.substring(0,firstTilde == -1 ?
canonicalForm.length() : firstTilde));
-
- if (firstTilde != -1) {
- List<String> keys = new ArrayList<String>();
- char chars[] = canonicalForm.toCharArray();
- StringBuffer work = new StringBuffer();
- for (int i = firstTilde+1; i < chars.length; ++i) {
- boolean isLast = (i == chars.length - 1);
- if (chars[i] == '~' && !isLast && chars[i+1] == '~') {
- work.append(chars[i++]);
- }
- else if (chars[i] == '~') {
- keys.add(work.toString());
- work = new StringBuffer();
- } else {
- work.append(chars[i]);
+ /** Key values. */
+ private final String _keyValues[];
+
+ /**
+ * Constructor.
+ *
+ * @param csetId
+ * correlation set identifier
+ * @param keyValues
+ * correlation key values
+ */
+ public CorrelationKey(int csetId, String[] keyValues) {
+ _csetId = csetId;
+ _keyValues = keyValues;
+ }
+
+ public CorrelationKey(String canonicalForm) {
+ int firstTilde = canonicalForm.indexOf('~');
+ _csetId = Integer.parseInt(canonicalForm.substring(0, firstTilde == -1
? canonicalForm.length() : firstTilde));
+
+ if (firstTilde != -1) {
+ List<String> keys = new ArrayList<String>();
+ char chars[] = canonicalForm.toCharArray();
+ StringBuffer work = new StringBuffer();
+ for (int i = firstTilde + 1; i < chars.length; ++i) {
+ boolean isLast = (i == chars.length - 1);
+ if (chars[i] == '~' && !isLast && chars[i + 1] == '~') {
+ work.append(chars[i++]);
+ } else if (chars[i] == '~') {
+ keys.add(work.toString());
+ work = new StringBuffer();
+ } else {
+ work.append(chars[i]);
+ }
+ }
+ keys.add(work.toString());
+ _keyValues = new String[keys.size()];
+ keys.toArray(_keyValues);
+ } else {
+ _keyValues = new String[0];
}
- }
- keys.add(work.toString());
- _keyValues = new String[keys.size()];
- keys.toArray(_keyValues);
- } else {
- _keyValues = new String[0];
- }
- }
-
- /** Return the OCorrelation id for the correlation set */
- public int getCSetId(){
- return _csetId;
- }
-
- /** Return the values for the correlation set */
- public String[] getValues(){
- return _keyValues;
- }
-
-
- /**
- * Check if this key matches any member in a set of keys.
- *
- * @param keys set of keys to match against
- *
- * @return <code>true</code> if one of the keys in the set
- * <code>equals(..)</code> this key, <code>false</code> otherwise
- */
- public boolean isMatch(CorrelationKey[] keys) {
- for (CorrelationKey key : keys)
- if (key.equals(this)) {
- return true;
- }
+ }
- return false;
- }
+ /** Return the OCorrelation id for the correlation set */
+ public int getCSetId() {
+ return _csetId;
+ }
- /**
- * Equals comperator method.
- *
- * @param o <code>CorrelationKey</code> object to compare with
- *
- * @return <code>true</code> if the given object
- */
- public boolean equals(Object o) {
- CorrelationKey okey = (CorrelationKey)o;
+ /** Return the values for the correlation set */
+ public String[] getValues() {
+ return _keyValues;
+ }
- if (okey == null || okey._csetId != _csetId || okey._keyValues.length !=
_keyValues.length)
- return false;
+ /**
+ * Check if this key matches any member in a set of keys.
+ *
+ * @param keys
+ * set of keys to match against
+ *
+ * @return <code>true</code> if one of the keys in the set
+ * <code>equals(..)</code> this key, <code>false</code>
+ * otherwise
+ */
+ public boolean isMatch(CorrelationKey[] keys) {
+ for (CorrelationKey key : keys)
+ if (key.equals(this)) {
+ return true;
+ }
- for (int i = 0; i < _keyValues.length; ++i)
- if (!_keyValues[i].equals(okey._keyValues[i]))
return false;
+ }
- return true;
- }
+ /**
+ * Equals comperator method.
+ *
+ * @param o
+ * <code>CorrelationKey</code> object to compare with
+ *
+ * @return <code>true</code> if the given object
+ */
+ public boolean equals(Object o) {
+ CorrelationKey okey = (CorrelationKey) o;
+
+ if (okey == null || okey._csetId != _csetId || okey._keyValues.length
!= _keyValues.length)
+ return false;
+
+ for (int i = 0; i < _keyValues.length; ++i)
+ if (!_keyValues[i].equals(okey._keyValues[i]))
+ return false;
- /**
- * Generate a hash code from the hash codes of the elements.
- * @see HashMap#hashCode
- * @see Object#hashCode
- */
- public int hashCode() {
- int hashCode = _csetId;
- for (String _keyValue : _keyValues) hashCode ^= _keyValue.hashCode();
- return hashCode;
- }
-
- public List<String> toCanonicalList() {
- ArrayList<String> ret = new ArrayList<String>(_keyValues.length+1);
- ret.add(((Integer)_csetId).toString());
- for (String i : _keyValues)
- ret.add(i);
- return ret;
- }
- /**
- * @see Object#toString
- */
- public String toString() {
- StringBuffer buf = new StringBuffer("{CorrelationKey ");
- buf.append("setId=");
- buf.append(_csetId);
- buf.append(", values=");
- buf.append(ArrayUtils.makeCollection(ArrayList.class,_keyValues));
- buf.append('}');
+ return true;
+ }
- return buf.toString();
- }
+ /**
+ * Generate a hash code from the hash codes of the elements.
+ *
+ * @see HashMap#hashCode
+ * @see Object#hashCode
+ */
+ public int hashCode() {
+ int hashCode = _csetId;
+ for (String _keyValue : _keyValues)
+ hashCode ^= _keyValue.hashCode();
+ return hashCode;
+ }
+
+ public List<String> toCanonicalList() {
+ ArrayList<String> ret = new ArrayList<String>(_keyValues.length + 1);
+ ret.add(((Integer) _csetId).toString());
+ for (String i : _keyValues)
+ ret.add(i);
+ return ret;
+ }
+
+ /**
+ * @see Object#toString
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer("{CorrelationKey ");
+ buf.append("setId=");
+ buf.append(_csetId);
+ buf.append(", values=");
+ buf.append(ArrayUtils.makeCollection(ArrayList.class, _keyValues));
+ buf.append('}');
+
+ return buf.toString();
+ }
+
+ public String toCanonicalString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(this.getCSetId());
+ buf.append('~');
+ for (int i = 0; i < getValues().length; ++i) {
+ if (i != 0)
+ buf.append('~');
+ escapeTilde(buf, getValues()[i]);
+ }
+ return buf.toString();
+ }
+
+ static void escapeTilde(StringBuffer buf, String str) {
+ if (str == null)
+ return;
+ char[] chars = str.toCharArray();
+ for (char achar : chars) {
+ if (achar == '~') {
+ buf.append("~~");
+ } else {
+ buf.append(achar);
+ }
+ }
+ }
}
Modified:
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
(original)
+++
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
Tue Sep 19 17:42:16 2006
@@ -101,7 +101,7 @@
ABORTED,
/** Other failure. */
- OTHER
+ OTHER, NOMATCH
}
/**
Modified:
incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
(original)
+++
incubator/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
Tue Sep 19 17:42:16 2006
@@ -243,7 +243,10 @@
* Delete previously registered activity recovery.
*/
void deleteActivityRecovery(String channel);
-
+
+ /** Expermiental -- lock. */
+ //public void lock();
+
/**
* Transport object holding the date of the first and last instance event
* along with the number events.
@@ -253,5 +256,6 @@
public Date last;
public int count;
}
+
}
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Tue Sep 19 17:42:16 2006
@@ -19,6 +19,7 @@
package org.apache.ode.bpel.engine;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
@@ -45,6 +46,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* Implementation of the [EMAIL PROTECTED] BpelEngine} interface: provides the
server
@@ -54,8 +57,21 @@
*
*/
public class BpelEngineImpl implements BpelEngine {
-
private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
+ /** RNG, for delays */
+ private Random _random = new Random(System.currentTimeMillis());
+
+ private static double _delayMean = 0;
+ static {
+ try {
+ _delayMean = Double.valueOf(System.getenv("ODE_DEBUG_TX_DELAY"));
+ __log.info("Stochastic debugging delay activated. Delay (Mean)=" +
_delayMean + "ms.");
+ } catch (Throwable t) {
+ __log.error("Could not read ODE_DEBUG_TX_DELAY environment
variable! Assuming 0 (mean) delay");
+
+ }
+ }
+
private static final Messages __msgs =
MessageBundle.getMessages(Messages.class);
@@ -201,7 +217,6 @@
}
public void onScheduledJob(String jobId, Map<String, Object> jobDetail) {
-
WorkEvent we = new WorkEvent(jobDetail);
ProcessInstanceDAO instance =
_contexts.dao.getConnection().getInstance(we.getIID());
@@ -227,6 +242,18 @@
assert process != null;
process.handleWorkEvent(jobDetail);
+
+ // Do a delay for debugging purposes.
+ if (_delayMean != 0 )
+ try {
+ double u = _random.nextDouble(); // Uniform
+ long delay = (long)(-Math.log(u)*_delayMean); // Exponential
distribution with mean _delayMean
+ __log.warn("Debugging delay has been activated; delaying
transaction for " + delay + "ms." );
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ ; // ignore
+ }
+
}
public MessageExchange getMessageExchangeByClientKey(String clientKey) {
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Tue Sep 19 17:42:16 2006
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bom.api.CorrelationSet;
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.common.InvalidMessageException;
@@ -76,6 +77,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* Entry point into the runtime of a BPEL process.
@@ -111,6 +113,9 @@
private DeploymentUnit _du;
+ /** WARNING - EXPERIMENTAL */
+ private InstanceLockManager _lockManager = new InstanceLockManager();
+
public BpelProcess(QName pid, DeploymentUnit du, OProcess oprocess,
Map<OPartnerLink, Endpoint> myRoleEndpointNames, Map<OPartnerLink,
Endpoint> initialPartners,
BpelEventListener debugger, ExpressionLanguageRuntimeRegistry
expLangRuntimeRegistry,
@@ -140,9 +145,9 @@
}
public String toString() {
- return "BpelProcess[" + _pid + " in " + _du + "]";
+ return "BpelProcess[" + _pid + " in " + _du + "]";
}
-
+
static String generateMessageExchangeIdentifier(String partnerlinkName,
String operationName) {
StringBuffer sb = new StringBuffer(partnerlinkName);
sb.append('.');
@@ -177,7 +182,7 @@
}
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
- for (Map.Entry<Endpoint,PartnerLinkMyRoleImpl> e :
_endpointToMyRoleMap.entrySet()) {
+ for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e :
_endpointToMyRoleMap.entrySet()) {
if (e.getKey().serviceName.equals(serviceName))
return e.getValue();
}
@@ -278,20 +283,19 @@
* <code>false</code> otherwise
*/
private boolean processInterceptors(MyRoleMessageExchangeImpl mex,
InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new
InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO());
-
- for (MessageExchangeInterceptor i : _mexInterceptors)
- if (!mex.processInterceptor(i,mex, ictx, invoker))
- return false;
+ InterceptorContextImpl ictx = new
InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO());
+
+ for (MessageExchangeInterceptor i : _mexInterceptors)
+ if (!mex.processInterceptor(i, mex, ictx, invoker))
+ return false;
for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
- if (!mex.processInterceptor(i,mex, ictx, invoker))
- return false;
-
-
+ if (!mex.processInterceptor(i, mex, ictx, invoker))
+ return false;
+
return true;
-
+
}
-
+
/**
* Replacement object for serializtation of the [EMAIL PROTECTED] OBase}
(compiled
* BPEL) objects in the JACOB VPU.
@@ -419,8 +423,8 @@
String partnerSessionId =
mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
if (__log.isDebugEnabled()) {
__log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
- + ArrayUtils.makeCollection(HashSet.class, keys)
- + " mySessionId=" + mySessionId + "
partnerSessionId=" + partnerSessionId);
+ + ArrayUtils.makeCollection(HashSet.class, keys) + "
mySessionId=" + mySessionId
+ + " partnerSessionId=" + partnerSessionId);
}
CorrelationKey matchedKey = null;
@@ -436,39 +440,12 @@
break;
}
}
- // Handling the "opaque correlation case": correlation is done on a
- // session identifier associated with my epr
- if (messageRoute == null) {
- String sessionId =
mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
- if (sessionId != null) {
- CorrelationKey key = new CorrelationKey(-1, new String[] {
sessionId });
- messageRoute = correlator.findRoute(key);
- if (__log.isDebugEnabled())
- __log.debug("INPUTMSG: rouing based on session id " +
sessionId + " --> " + messageRoute);
-
- }
- }
// TODO - ODE-58
- // If no luck try to find a default route.
- if (messageRoute == null) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId
- + ": CorrelationKey routing failed; checking
default route.");
- }
-
- messageRoute = correlator.findRoute(null);
-
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": default route
is to " + messageRoute);
- }
- }
-
- // If still no luck, and this operation qualifies for
- // create-instance
- // treatment,
- // then create a new process instance.
+ // If no luck, and this operation qualifies for create-instance
+ // treatment, then create a new process
+ // instance.
if (messageRoute == null && isCreateInstnace) {
if (__log.isDebugEnabled()) {
__log.debug("INPUTMSG: " + correlatorId + ": default
routing failed, CREATING NEW INSTANCE");
@@ -477,12 +454,12 @@
if (processDAO.isRetired()) {
throw new InvalidProcessException("Process is retired.",
InvalidProcessException.RETIRED_CAUSE_CODE);
}
-
+
if (!processInterceptors(mex,
InterceptorInvoker.__onNewInstanceInvoked)) {
- __log.debug("Not creating a new instance for mex " +
mex + "; interceptor prevented!");
- return;
+ __log.debug("Not creating a new instance for mex " + mex +
"; interceptor prevented!");
+ return;
}
-
+
ProcessInstanceDAO newInstance =
processDAO.createInstance(correlator);
BpelRuntimeContextImpl instance =
createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
@@ -504,74 +481,69 @@
+
messageRoute.getTargetInstance().getInstanceId());
}
+ // Attempt to acquire an instance-level lock.
+ //
_lockManager.lock(messageRoute.getTargetInstance().getInstanceId(),
+ // 60, TimeUnit.SECONDS);
+
+ ProcessInstanceDAO instanceDao =
messageRoute.getTargetInstance();
+
// Reload process instance for DAO.
- BpelRuntimeContextImpl instance =
createRuntimeContext(messageRoute.getTargetInstance(), null, null);
+ BpelRuntimeContextImpl instance =
createRuntimeContext(instanceDao, null, null);
instance.inputMsgMatch(messageRoute.getGroupId(),
messageRoute.getIndex(), mex);
// Kill the route so some new message does not get routed to
// same
// process
// instance.
- correlator.removeRoutes(messageRoute.getGroupId(),
messageRoute.getTargetInstance());
-
- Long iid = messageRoute.getTargetInstance().getInstanceId();
+ correlator.removeRoutes(messageRoute.getGroupId(),
instanceDao);
// send process instance event
CorrelationMatchEvent evt = new CorrelationMatchEvent(new
QName(_oprocess.targetNamespace, _oprocess
- .getName()), getProcessDAO().getProcessId(), iid,
matchedKey);
+ .getName()), getProcessDAO().getProcessId(),
instanceDao.getInstanceId(), matchedKey);
evt.setPortType(mex.getPortType().getQName());
evt.setOperation(operation.getName());
evt.setMexId(mex.getMessageExchangeId());
_debugger.onEvent(evt);
// store event
- getProcessDAO().getInstance(iid).insertBpelEvent(evt);
+ instanceDao.insertBpelEvent(evt);
+
+ // EXPERIMENTAL -- LOCK
+ //instanceDao.lock();
mex.setCorrelationStatus(CorrelationStatus.MATCHED);
mex.getDAO().setInstance(messageRoute.getTargetInstance());
-
- // run the vpu
instance.execute();
} else {
if (__log.isDebugEnabled()) {
__log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB
(no match) ");
}
- // send event
- CorrelationNoMatchEvent evt = new
CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
- .getOperation().getName(), mex.getMessageExchangeId(),
keys);
+ if (!mex.isAsynchronous()) {
+ mex.setFailure(FailureType.NOMATCH, "No process instance
matching correlation keys.", null);
- evt.setProcessId(getProcessDAO().getProcessId());
- evt.setProcessName(new QName(_oprocess.targetNamespace,
_oprocess.getName()));
- _debugger.onEvent(evt);
+ } else {
+ // send event
+ CorrelationNoMatchEvent evt = new
CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+ .getOperation().getName(),
mex.getMessageExchangeId(), keys);
+
+ evt.setProcessId(getProcessDAO().getProcessId());
+ evt.setProcessName(new QName(_oprocess.targetNamespace,
_oprocess.getName()));
+ _debugger.onEvent(evt);
- mex.setCorrelationStatus(CorrelationStatus.QUEUED);
+ mex.setCorrelationStatus(CorrelationStatus.QUEUED);
- // No match, means we add message exchange to the queue.
- correlator.enqueueMessage(mex.getDAO(), keys);
+ // No match, means we add message exchange to the queue.
+ correlator.enqueueMessage(mex.getDAO(), keys);
+ }
}
- // Now we have to update our message exchange status. If the
<reply>
- // was
- // not hit during the invocation, then we will be in the "REQUEST"
- // phase
- // which means that either this was a one-way or a two-way that
- // needs to
- // delivery the reply asynchronously.
+ // Now we have to update our message exchange status. If the
<reply> was not hit during the
+ // invocation, then we will be in the "REQUEST" phase which means
that either this was a one-way
+ // or a two-way that needs to delivery the reply asynchronously.
if (mex.getStatus() == Status.REQUEST) {
- switch (mex.getPattern()) {
- case REQUEST_ONLY:
- mex.setStatus(Status.ONE_WAY);
- break;
- case REQUEST_RESPONSE:
- mex.setStatus(Status.ASYNC);
- break;
- default:
- String errmsg = "BpelProcess: internal error, message
exchange pattern not set";
- __log.fatal(errmsg);
- throw new BpelEngineException(errmsg);
- }
+ mex.setStatus(Status.ASYNC);
}
}
@@ -596,6 +568,11 @@
keys.add(key);
}
+ // Let's creata a key based on the sessionId
+ String mySessionId =
mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+ if (mySessionId != null)
+ keys.add(new CorrelationKey(-1, new String[] { mySessionId }));
+
return keys.toArray(new CorrelationKey[keys.size()]);
}
@@ -699,6 +676,12 @@
processInstance.invocationResponse(we.getMexId(), we.getChannel());
processInstance.execute();
break;
+ case MATCHER:
+ if (__log.isDebugEnabled()) {
+ __log.debug("Matcher event for iid " + we.getIID());
+ }
+
+ processInstance.matcherEvent(we.getCorrelatorId(),
we.getCorrelationKey());
}
}
@@ -722,15 +705,15 @@
void activate(BpelEngineImpl engine) {
_engine = engine;
_debugger = new DebuggerSupport(this);
-
+
__log.debug("Activating " + _pid);
// Activate all the my-role endpoints.
for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) {
myrole._initialEPR =
_engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, _du,
myrole._endpoint,
myrole._plinkDef.myRolePortType);
-
- __log.debug("Activated " + _pid + " myrole " +
myrole.getPartnerLinkName()
- + ": EPR is " + myrole._initialEPR);
+
+ __log.debug("Activated " + _pid + " myrole " +
myrole.getPartnerLinkName() + ": EPR is "
+ + myrole._initialEPR);
}
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
@@ -741,12 +724,12 @@
if (epr != null) {
prole._initialEPR = epr;
}
-
- __log.debug("Activated " + _pid + " partnerrole " +
prole.getPartnerLinkName()
- + ": EPR is " + prole._initialEPR);
-
+
+ __log.debug("Activated " + _pid + " partnerrole " +
prole.getPartnerLinkName() + ": EPR is "
+ + prole._initialEPR);
+
}
-
+
__log.debug("Activated " + _pid);
}
@@ -759,7 +742,7 @@
}
- EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) {
+ EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) {
PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + link);
@@ -778,9 +761,9 @@
}
PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) {
- PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink);
- if (prole == null)
- throw new IllegalStateException("Unknown partner link " +
partnerLink);
- return prole._channel;
- }
+ PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink);
+ if (prole == null)
+ throw new IllegalStateException("Unknown partner link " +
partnerLink);
+ return prole._channel;
+ }
}