Author: mszefler
Date: Thu Oct 12 12:02:46 2006
New Revision: 463382
URL: http://svn.apache.org/viewvc?view=rev&rev=463382
Log:
ODE-65 Fixed. Problem with MEMDAO, correlator removeRoutes was only
removing routes from one correlator.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java?view=diff&rev=463382&r1=463381&r2=463382
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
Thu Oct 12 12:02:46 2006
@@ -72,12 +72,7 @@
}
public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
- for (Iterator<MessageRouteDaoImpl> i = _routes.iterator(); i.hasNext();) {
- MessageRouteDaoImpl we = i.next();
- if ((we._groupId.equals(routeGroupId) || routeGroupId == null) &&
we._instance == target) {
- i.remove();
- }
- }
+ ((ProcessInstanceDaoImpl)target).removeRoutes(routeGroupId);
}
public void enqueueMessage(MessageExchangeDAO mex, CorrelationKey[] keys) {
@@ -99,6 +94,15 @@
_routes.add(mr);
}
+ void _removeRoutes(String routeGroupId, ProcessInstanceDaoImpl target) {
+ for (Iterator<MessageRouteDaoImpl> i = _routes.iterator(); i.hasNext();)
{
+ MessageRouteDaoImpl we = i.next();
+ if ((we._groupId.equals(routeGroupId) || routeGroupId == null) &&
we._instance == target) {
+ i.remove();
+ }
+ }
+
+ }
/**
* @see java.lang.Object#toString()
*/
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=463382&r1=463381&r2=463382
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
Thu Oct 12 12:02:46 2006
@@ -7,6 +7,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,7 +36,7 @@
private QName _processId;
private QName _type;
- private final Map<String, CorrelatorDAO> _correlators = new
ConcurrentHashMap<String, CorrelatorDAO>();
+ final Map<String, CorrelatorDaoImpl> _correlators = new
ConcurrentHashMap<String, CorrelatorDaoImpl>();
protected final Map<Long, ProcessInstanceDAO> _instances = new
ConcurrentHashMap<Long, ProcessInstanceDAO>();
protected final Map<Integer, PartnerLinkDAO> _plinks = new
ConcurrentHashMap<Integer, PartnerLinkDAO>();
private Map<QName, ProcessDaoImpl> _store;
@@ -72,9 +73,18 @@
}
public Collection<CorrelatorDAO> getCorrelators() {
- return _correlators.values();
- }
+ // Note: _correlators.values() is a Collection<CorrealatorDaoImpl>. We
can't just return this object
+ // since Collection<CorrelatorDAO> is /not/ assignment compatible with
Collection<CorrelatorDaoImpl>.
+ // However, a immutable Collection<CorrelationDAO> is assignment
compatible with Collection<CorrelatorDaoImpl>,
+ // but.... we need to introduce some ambiguity into the type hierarchy
so that Java will infer the correct type.
+
+ // Make an ambiguous collection.
+ Collection<? extends CorrelatorDAO> foo = _correlators.values();
+ // In order to get a collection of the super-type from a sub-type we
must make the collection read-only.
+ return Collections.unmodifiableCollection(foo);
+ }
+
public void removeRoutes(String routeId, ProcessInstanceDAO target) {
for (CorrelatorDAO correlatorDAO : _correlators.values()) {
correlatorDAO.removeRoutes(routeId, target);
@@ -212,4 +222,8 @@
public int getNumInstances() {
return _instances.size();
}
+
+ public ProcessInstanceDAO getInstanceWithLock(Long iid) {
+ return getInstance(iid);
+ }
}
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=463382&r1=463381&r2=463382
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
Thu Oct 12 12:02:46 2006
@@ -31,341 +31,371 @@
import java.util.Map;
import java.util.Set;
-
/**
- * A very simple, in-memory implementation of the [EMAIL PROTECTED]
ProcessInstanceDAO} interface.
+ * A very simple, in-memory implementation of the [EMAIL PROTECTED]
ProcessInstanceDAO}
+ * interface.
*/
- class ProcessInstanceDaoImpl extends DaoBaseImpl implements
ProcessInstanceDAO {
- private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS =
Collections.emptyList();
+class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO
{
+ private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS =
Collections.emptyList();
+
+ private short _previousState;
+
+ private short _state;
+
+ private Long _instanceId;
+
+ private ProcessDaoImpl _processDao;
+
+ private byte[] _jacobState;
+
+ private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
+
+ private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String,
List<ScopeDAO>>();
+
+ private Map<String, byte[]> _messageExchanges = new HashMap<String,
byte[]>();
+
+ private ScopeDAO _rootScope;
+
+ private FaultDAO _fault;
+
+ private CorrelatorDAO _instantiatingCorrelator;
+
+ private BpelDAOConnection _conn;
+
+ private int _failureCount;
+
+ private Date _failureDateTime;
+
+ private Map<String, ActivityRecoveryDAO> _activityRecoveries = new
HashMap<String, ActivityRecoveryDAO>();
+
+ // TODO: Remove this, we should be using the main event store...
+ private List<ProcessInstanceEvent> _events = new
ArrayList<ProcessInstanceEvent>();
+
+ private Date _lastActive;
+
+ private int _seq;
+
+ ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao,
CorrelatorDAO correlator) {
+ _state = 0;
+ _processDao = processDao;
+ _instantiatingCorrelator = correlator;
+ _jacobState = null;
+ _instanceId = IdGen.newProcessId();
+ _conn = conn;
+ }
+
+ public BpelDAOConnection getConnection() {
+ return _conn;
+ }
+
+ public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
+ ArrayList<XmlDataDAO> res = new ArrayList<XmlDataDAO>();
+ for (ScopeDAO scope : _scopes.values()) {
+ if (scope.getModelId() == scopeModelId) {
+ XmlDataDAO xmld = scope.getVariable(variableName);
+ if (xmld != null)
+ res.add(xmld);
+ }
+ }
+ return res.toArray(new XmlDataDAO[res.size()]);
+ }
+
+ public Set<CorrelationSetDAO> getCorrelationSets() {
+ HashSet<CorrelationSetDAO> res = new HashSet<CorrelationSetDAO>();
+ for (ScopeDAO scopeDAO : _scopes.values()) {
+ res.addAll(scopeDAO.getCorrelationSets());
+ }
+ return res;
+ }
+
+ public CorrelationSetDAO getCorrelationSet(String name) {
+ for (ScopeDAO scopeDAO : _scopes.values()) {
+ if (scopeDAO.getCorrelationSet(name) != null)
+ return scopeDAO.getCorrelationSet(name);
+ }
+ return null;
+ }
+
+ public void setFault(QName name, String explanation, int lineNo, int
activityId, Element faultData) {
+ _fault = new FaultDaoImpl(QNameUtils.fromQName(name), explanation,
faultData, lineNo, activityId);
+ }
+
+ public void setFault(FaultDAO fault) {
+ _fault = fault;
+ }
+
+ public FaultDAO getFault() {
+ return _fault;
+ }
+
+ /**
+ * @see ProcessInstanceDAO#getExecutionState()
+ */
+ public byte[] getExecutionState() {
+ return _jacobState;
+ }
+
+ public void setExecutionState(byte[] bytes) {
+ _jacobState = bytes;
+ }
+
+ public byte[] getMessageExchange(String identifier) {
+ byte[] mex = _messageExchanges.get(identifier);
+ assert (mex != null);
+ return mex;
+ }
+
+ /**
+ * @see ProcessInstanceDAO#getProcess()
+ */
+ public ProcessDAO getProcess() {
+ return _processDao;
+ }
+
+ /**
+ * @see ProcessInstanceDAO#getRootScope()
+ */
+ public ScopeDAO getRootScope() {
+ return _rootScope;
+ }
+
+ /**
+ * @see ProcessInstanceDAO#setState(short)
+ */
+ public void setState(short state) {
+ _previousState = _state;
+ _state = state;
+ if (state == ProcessState.STATE_TERMINATED) {
+ for (CorrelatorDAO correlatorDAO : _processDao.getCorrelators()) {
+ correlatorDAO.removeRoutes(null, this);
+ }
+ }
+ }
+
+ /**
+ * @see ProcessInstanceDAO#getState()
+ */
+ public short getState() {
+ return _state;
+ }
+
+ public void addMessageExchange(String identifier, byte[] data) {
+ assert (!_messageExchanges.containsKey(identifier));
+ _messageExchanges.put(identifier, data);
+ }
+
+ public ScopeDAO createScope(ScopeDAO parentScope, String scopeType, int
scopeModelId) {
+ ScopeDaoImpl newScope = new ScopeDaoImpl(this, parentScope, scopeType,
scopeModelId);
+ _scopes.put(newScope.getScopeInstanceId(), newScope);
+ List<ScopeDAO> namedScopes = _scopesByName.get(scopeType);
+ if (namedScopes == null) {
+ namedScopes = new LinkedList<ScopeDAO>();
+ _scopesByName.put(scopeType, namedScopes);
+ }
+ namedScopes.add(newScope);
+ if (parentScope == null) {
+ assert _rootScope == null;
+ _rootScope = newScope;
+ }
+
+ return newScope;
+ }
+
+ public Long getInstanceId() {
+ return _instanceId;
+ }
- private short _previousState;
- private short _state;
- private Long _instanceId;
- private ProcessDaoImpl _processDao;
- private byte[] _jacobState;
- private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
- private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String,
List<ScopeDAO>>();
- private Map<String, byte[]> _messageExchanges = new HashMap<String,
byte[]>();
- private ScopeDAO _rootScope;
- private FaultDAO _fault;
- private CorrelatorDAO _instantiatingCorrelator;
- private BpelDAOConnection _conn;
- private int _failureCount;
- private Date _failureDateTime;
- private Map<String, ActivityRecoveryDAO> _activityRecoveries = new
HashMap<String, ActivityRecoveryDAO>();
-
- // TODO: Remove this, we should be using the main event store...
- private List<ProcessInstanceEvent> _events = new
ArrayList<ProcessInstanceEvent>();
-
- private Date _lastActive;
-
- private int _seq;
-
- ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao,
CorrelatorDAO correlator) {
- _state = 0;
- _processDao = processDao;
- _instantiatingCorrelator = correlator;
- _jacobState = null;
- _instanceId = IdGen.newProcessId();
- _conn = conn;
- }
-
- public BpelDAOConnection getConnection() {
- return _conn;
- }
-
- public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
- ArrayList<XmlDataDAO> res = new ArrayList<XmlDataDAO>();
- for (ScopeDAO scope : _scopes.values()) {
- if (scope.getModelId() == scopeModelId) {
- XmlDataDAO xmld = scope.getVariable(variableName);
- if (xmld != null) res.add(xmld);
- }
- }
- return res.toArray(new XmlDataDAO[res.size()]);
- }
-
- public Set<CorrelationSetDAO> getCorrelationSets() {
- HashSet<CorrelationSetDAO> res = new HashSet<CorrelationSetDAO>();
- for (ScopeDAO scopeDAO : _scopes.values()) {
- res.addAll(scopeDAO.getCorrelationSets());
- }
- return res;
- }
-
- public CorrelationSetDAO getCorrelationSet(String name) {
- for (ScopeDAO scopeDAO : _scopes.values()) {
- if (scopeDAO.getCorrelationSet(name) != null) return
scopeDAO.getCorrelationSet(name);
- }
- return null;
- }
-
- public void setFault(QName name, String explanation, int lineNo, int
activityId, Element faultData) {
- _fault = new FaultDaoImpl(QNameUtils.fromQName(name), explanation,
faultData, lineNo, activityId);
- }
-
- public void setFault(FaultDAO fault) {
- _fault = fault;
- }
-
- public FaultDAO getFault() {
- return _fault;
- }
-
- /**
- * @see ProcessInstanceDAO#getExecutionState()
- */
- public byte[] getExecutionState() {
- return _jacobState;
- }
-
- public void setExecutionState(byte[] bytes) {
- _jacobState = bytes;
- }
-
- public byte[] getMessageExchange(String identifier) {
- byte[] mex = _messageExchanges.get(identifier);
- assert (mex != null);
- return mex;
- }
-
- /**
- * @see ProcessInstanceDAO#getProcess()
- */
- public ProcessDAO getProcess() {
- return _processDao;
- }
-
- /**
- * @see ProcessInstanceDAO#getRootScope()
- */
- public ScopeDAO getRootScope() {
- return _rootScope;
- }
-
- /**
- * @see ProcessInstanceDAO#setState(short)
- */
- public void setState(short state) {
- _previousState = _state;
- _state = state;
- if(state==ProcessState.STATE_TERMINATED) {
- for (CorrelatorDAO correlatorDAO : _processDao.getCorrelators()) {
- correlatorDAO.removeRoutes(null, this);
- }
- }
- }
-
- /**
- * @see ProcessInstanceDAO#getState()
- */
- public short getState() {
- return _state;
- }
-
- public void addMessageExchange(String identifier, byte[] data) {
- assert (!_messageExchanges.containsKey(identifier));
- _messageExchanges.put(identifier, data);
- }
-
-
- public ScopeDAO createScope(ScopeDAO parentScope, String scopeType, int
scopeModelId) {
- ScopeDaoImpl newScope = new ScopeDaoImpl(this, parentScope, scopeType,
scopeModelId);
- _scopes.put(newScope.getScopeInstanceId(), newScope);
- List<ScopeDAO> namedScopes = _scopesByName.get(scopeType);
- if(namedScopes == null){
- namedScopes = new LinkedList<ScopeDAO>();
- _scopesByName.put(scopeType, namedScopes);
- }
- namedScopes.add(newScope);
- if (parentScope == null) {
- assert _rootScope == null;
- _rootScope = newScope;
- }
-
- return newScope;
- }
-
- public Long getInstanceId() {
- return _instanceId;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScope(java.lang.Long)
- */
- public ScopeDAO getScope(Long scopeInstanceId) {
- return _scopes.get(scopeInstanceId);
- }
-
- public List<ProcessInstanceEvent> getEvents(int idx, int count) {
- int sidx = Math.max(idx, 0);
- sidx = Math.min(sidx, _events.size()-1);
- int eidx = Math.min(sidx + count, _events.size());
- return _events.subList(sidx, eidx);
- }
-
- /**
- * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#insertBpelEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent)
- */
- public void insertBpelEvent(ProcessInstanceEvent event) {
- _events.add(event);
- }
-
- public int getEventCount() {
- return _events.size();
- }
-
- /**
- * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
- */
- public CorrelatorDAO getInstantiatingCorrelator() {
- return _instantiatingCorrelator;
- }
-
- /**
- * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
- */
- public Collection<ScopeDAO> getScopes(String scopeName) {
- List<ScopeDAO> scopes = _scopesByName.get(scopeName);
- return (scopes == null ? EMPTY_SCOPE_DAOS : scopes);
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
- */
- public short getPreviousState() {
- return _previousState;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
- */
- public Date getLastActiveTime() {
- return _lastActive;
- }
-
- /**
- * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
- */
- public void setLastActiveTime(Date dt) {
- _lastActive = dt;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
- */
- public void finishCompletion() {
- // make sure we have completed.
- assert (ProcessState.isFinished(this.getState()));
- // let our process know that we've done our work.
- this.getProcess().instanceCompleted(this);
- }
-
- public void delete() {
- _processDao._instances.remove(_instanceId);
- }
-
- public Collection<ScopeDAO> getScopes() {
- return _scopes.values();
- }
-
- public EventsFirstLastCountTuple getEventsFirstLastCount() {
- EventsFirstLastCountTuple ret = new EventsFirstLastCountTuple();
- ret.count = _events.size();
- Date first = new Date();
- Date last = new Date(0);
- for (ProcessInstanceEvent event : _events) {
- if (event.getTimestamp().before(first)) first = event.getTimestamp();
- if (event.getTimestamp().after(last)) last = event.getTimestamp();
- }
- ret.first = first;
- ret.last = last;
- return ret;
- }
-
- public int getActivityFailureCount() {
- return _failureCount;
- }
-
- public Date getActivityFailureDateTime() {
- return _failureDateTime;
- }
-
- public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
- return _activityRecoveries.values();
- }
-
- public void createActivityRecovery(String channel, long activityId, String
reason, Date dateTime,
- Element data, String[] actions, int
retries) {
- _activityRecoveries.put(channel, new ActivityRecoveryDAOImpl(channel,
activityId, reason, dateTime, data, actions, retries));
- _failureCount = _activityRecoveries.size();
- _failureDateTime = dateTime;
- }
-
- public void deleteActivityRecovery(String channel) {
- _activityRecoveries.remove(channel);
- _failureCount = _activityRecoveries.size();
- }
-
- public long genMonotonic() {
- return ++_seq;
- }
-
- static class ActivityRecoveryDAOImpl implements ActivityRecoveryDAO {
-
- private long _activityId;
- private String _channel;
- private String _reason;
- private Element _details;
- private Date _dateTime;
- private String _actions;
- private int _retries;
-
- ActivityRecoveryDAOImpl(String channel, long activityId, String reason,
Date dateTime,
- Element details, String[] actions, int retries) {
- _activityId = activityId;
- _channel = channel;
- _reason = reason;
- _details = details;
- _dateTime = dateTime;
- _actions = actions[0];
- for (int i = 1; i < actions.length; ++i)
- _actions += " " + actions[i];
- _retries = retries;
- }
-
- public long getActivityId() {
- return _activityId;
- }
-
- public String getChannel() {
- return _channel;
- }
-
- public String getReason() {
- return _reason;
- }
-
- public Element getDetails() {
- return _details;
- }
-
- public Date getDateTime() {
- return _dateTime;
- }
-
- public String getActions() {
- return _actions;
- }
-
- public String[] getActionsList() {
- return _actions.split(" ");
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScope(java.lang.Long)
+ */
+ public ScopeDAO getScope(Long scopeInstanceId) {
+ return _scopes.get(scopeInstanceId);
}
- public int getRetries() {
- return _retries;
+ public List<ProcessInstanceEvent> getEvents(int idx, int count) {
+ int sidx = Math.max(idx, 0);
+ sidx = Math.min(sidx, _events.size() - 1);
+ int eidx = Math.min(sidx + count, _events.size());
+ return _events.subList(sidx, eidx);
}
- }
+ /**
+ * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#insertBpelEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent)
+ */
+ public void insertBpelEvent(ProcessInstanceEvent event) {
+ _events.add(event);
+ }
+
+ public int getEventCount() {
+ return _events.size();
+ }
+
+ /**
+ * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
+ */
+ public CorrelatorDAO getInstantiatingCorrelator() {
+ return _instantiatingCorrelator;
+ }
+
+ /**
+ * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
+ */
+ public Collection<ScopeDAO> getScopes(String scopeName) {
+ List<ScopeDAO> scopes = _scopesByName.get(scopeName);
+ return (scopes == null ? EMPTY_SCOPE_DAOS : scopes);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
+ */
+ public short getPreviousState() {
+ return _previousState;
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
+ */
+ public Date getLastActiveTime() {
+ return _lastActive;
+ }
+
+ /**
+ * @see
org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
+ */
+ public void setLastActiveTime(Date dt) {
+ _lastActive = dt;
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
+ */
+ public void finishCompletion() {
+ // make sure we have completed.
+ assert (ProcessState.isFinished(this.getState()));
+ // let our process know that we've done our work.
+ this.getProcess().instanceCompleted(this);
+ }
+
+ public void delete() {
+ _processDao._instances.remove(_instanceId);
+ }
+
+ public Collection<ScopeDAO> getScopes() {
+ return _scopes.values();
+ }
+
+ public EventsFirstLastCountTuple getEventsFirstLastCount() {
+ EventsFirstLastCountTuple ret = new EventsFirstLastCountTuple();
+ ret.count = _events.size();
+ Date first = new Date();
+ Date last = new Date(0);
+ for (ProcessInstanceEvent event : _events) {
+ if (event.getTimestamp().before(first))
+ first = event.getTimestamp();
+ if (event.getTimestamp().after(last))
+ last = event.getTimestamp();
+ }
+ ret.first = first;
+ ret.last = last;
+ return ret;
+ }
+
+ public int getActivityFailureCount() {
+ return _failureCount;
+ }
+
+ public Date getActivityFailureDateTime() {
+ return _failureDateTime;
+ }
+
+ public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
+ return _activityRecoveries.values();
+ }
+
+ public void createActivityRecovery(String channel, long activityId, String
reason, Date dateTime, Element data,
+ String[] actions, int retries) {
+ _activityRecoveries
+ .put(channel, new ActivityRecoveryDAOImpl(channel, activityId,
reason, dateTime, data, actions, retries));
+ _failureCount = _activityRecoveries.size();
+ _failureDateTime = dateTime;
+ }
+
+ public void deleteActivityRecovery(String channel) {
+ _activityRecoveries.remove(channel);
+ _failureCount = _activityRecoveries.size();
+ }
+
+ public long genMonotonic() {
+ return ++_seq;
+ }
+
+ static class ActivityRecoveryDAOImpl implements ActivityRecoveryDAO {
+
+ private long _activityId;
+
+ private String _channel;
+
+ private String _reason;
+
+ private Element _details;
+
+ private Date _dateTime;
+
+ private String _actions;
+
+ private int _retries;
+
+ ActivityRecoveryDAOImpl(String channel, long activityId, String
reason, Date dateTime, Element details, String[] actions,
+ int retries) {
+ _activityId = activityId;
+ _channel = channel;
+ _reason = reason;
+ _details = details;
+ _dateTime = dateTime;
+ _actions = actions[0];
+ for (int i = 1; i < actions.length; ++i)
+ _actions += " " + actions[i];
+ _retries = retries;
+ }
+
+ public long getActivityId() {
+ return _activityId;
+ }
+
+ public String getChannel() {
+ return _channel;
+ }
+
+ public String getReason() {
+ return _reason;
+ }
+
+ public Element getDetails() {
+ return _details;
+ }
+
+ public Date getDateTime() {
+ return _dateTime;
+ }
+
+ public String getActions() {
+ return _actions;
+ }
+
+ public String[] getActionsList() {
+ return _actions.split(" ");
+ }
+
+ public int getRetries() {
+ return _retries;
+ }
+
+ }
+
+ void removeRoutes(String routeGroupId) {
+ for (CorrelatorDaoImpl correlator : _processDao._correlators.values())
+ correlator._removeRoutes(routeGroupId, this);
+
+ }
}