Author: mszefler
Date: Thu Dec 21 15:27:12 2006
New Revision: 489498
URL: http://svn.apache.org/viewvc?view=rev&rev=489498
Log:
Update the PMAPI to rely more on ProcessStore rather than the runtime db.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java?view=diff&rev=489498&r1=489497&r2=489498
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
Thu Dec 21 15:27:12 2006
@@ -19,9 +19,11 @@
package org.apache.ode.bpel.engine;
+import org.apache.commons.collections.comparators.ComparatorChain;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.Filter;
import org.apache.ode.bpel.common.InstanceFilter;
import org.apache.ode.bpel.common.ProcessFilter;
import org.apache.ode.bpel.dao.*;
@@ -37,58 +39,69 @@
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.pmapi.*;
+import org.apache.ode.daohib.bpel.hobj.HProcess;
import org.apache.ode.utils.ISO8601DateParser;
import org.apache.ode.utils.msg.MessageBundle;
import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.MemberOfFunction;
import org.apache.ode.utils.stl.UnaryFunction;
+import org.hibernate.Criteria;
+import org.hibernate.Session;
+import org.hibernate.criterion.Example;
+import org.hibernate.criterion.Property;
+import org.hibernate.criterion.Restrictions;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.namespace.QName;
import java.io.File;
+import java.text.ParseException;
import java.util.*;
+import java.util.regex.Pattern;
/**
* Implentation of the Process and InstanceManagement APIs.
*
- * TODO Pull up IM/PM methods from BpelManagementFacadeImpl
+ * @todo Move this out of the engine, it no longer belongs here.
*/
-public class ProcessAndInstanceManagementImpl
- implements InstanceManagement, ProcessManagement {
+public class ProcessAndInstanceManagementImpl implements InstanceManagement,
ProcessManagement {
protected static final Messages __msgs =
MessageBundle.getMessages(Messages.class);
+
protected static Log __log =
LogFactory.getLog(BpelManagementFacadeImpl.class);
+
protected static final ProcessStatusConverter __psc = new
ProcessStatusConverter();
+
protected BpelDatabase _db;
+
protected ProcessStore _store;
- protected Calendar _calendar = Calendar.getInstance(); // Calendar can be
expensive to initialize so we cache and clone it
+
+ protected Calendar _calendar = Calendar.getInstance(); // Calendar can be
+
+ // expensive to
+ // initialize so we
+ // cache and clone
+ // it
protected BpelServerImpl _server;
- public ProcessAndInstanceManagementImpl(BpelServer server,
- ProcessStore store) {
+ public ProcessAndInstanceManagementImpl(BpelServer server, ProcessStore
store) {
_server = (BpelServerImpl) server;
_db = _server._db;
_store = store;
}
- public ProcessInfoListDocument listProcessesCustom(String filter, String
orderKeys, final ProcessInfoCustomizer custom) {
+ public ProcessInfoListDocument listProcessesCustom(String filter, String
orderKeys,
+ final ProcessInfoCustomizer custom) {
ProcessInfoListDocument ret =
ProcessInfoListDocument.Factory.newInstance();
final TProcessInfoList procInfoList = ret.addNewProcessInfoList();
final ProcessFilter processFilter = new ProcessFilter(filter,
orderKeys);
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) {
- Collection<ProcessDAO> processes =
conn.processQuery(processFilter);
- for (ProcessDAO proc : processes) {
- fillProcessInfo(procInfoList.addNewProcessInfo(),
proc, custom);
- }
- return null;
- }
- });
+ for (ProcessConf pconf : processQuery(processFilter))
+ fillProcessInfo(procInfoList.addNewProcessInfo(), pconf,
custom);
} catch (Exception e) {
- throw new ProcessingException("Exception while listing
processes",e);
+ throw new ProcessingException("Exception while listing processes",
e);
}
return ret;
@@ -106,19 +119,20 @@
return genProcessInfoDocument(pid, custom);
}
-
public ProcessInfoDocument getProcessInfo(QName pid) {
return getProcessInfoCustom(pid, ProcessInfoCustomizer.ALL);
}
public ProcessInfoDocument activate(QName pid) {
- // TODO: Figure out how to deal with activation/retirement.
+ try {
+ _store.setState(pid, org.apache.ode.bpel.iapi.ProcessState.ACTIVE);
+ } catch (Exception ex) {
+ throw new ManagementException("Error setting process state.", ex);
+ }
return genProcessInfoDocument(pid, ProcessInfoCustomizer.NONE);
}
-
- public ProcessInfoDocument setRetired(final QName pid, final boolean
retired)
- throws ManagementException {
+ public ProcessInfoDocument setRetired(final QName pid, final boolean
retired) throws ManagementException {
try {
_store.setState(pid, retired ? ProcessState.RETIRED :
ProcessState.ACTIVE);
} catch (BpelEngineException e) {
@@ -132,20 +146,26 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
- if (proc == null)
- throw new ProcessNotFoundException("ProcessNotFound:"
+ pid);
- _store.setProperty(pid, propertyName, value);
- fillProcessInfo(pi, proc, new
ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
- return null;
- }
- });
+ try {
+ _store.setProperty(pid, propertyName, value);
+ } catch (Exception ex) {
+ // Likely the process no longer exists in the store.
+ __log.debug("Error setting property value for " + pid + "; " +
propertyName);
+ }
+
+ // We have to do this after we set the property, since the
+ // ProcessConf object
+ // is immutable.
+ ProcessConf proc = _store.getProcessConfiguration(pid);
+ if (proc == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" + pid);
+
+ fillProcessInfo(pi, proc, new
ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
+
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while setting process
property",e);
+ throw new ProcessingException("Exception while setting process
property", e);
}
return ret;
@@ -156,20 +176,26 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
- if (proc == null)
- throw new ProcessNotFoundException("ProcessNotFound:"
+ pid);
- _store.setProperty(pid, propertyName, value);
- fillProcessInfo(pi, proc, new
ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
- return null;
- }
- });
+ try {
+ _store.setProperty(pid, propertyName, value);
+ } catch (Exception ex) {
+ // Likely the process no longer exists in the store.
+ __log.debug("Error setting property value for " + pid + "; " +
propertyName);
+ }
+
+ // We have to do this after we set the property, since the
+ // ProcessConf object
+ // is immutable.
+ ProcessConf proc = _store.getProcessConfiguration(pid);
+ if (proc == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" + pid);
+
+ fillProcessInfo(pi, proc, new
ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
+
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while setting process
property",e);
+ throw new ProcessingException("Exception while setting process
property", e);
}
return ret;
@@ -191,7 +217,7 @@
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception while listing
instances",e);
+ throw new ProcessingException("Exception while listing instances",
e);
}
return ret;
@@ -209,8 +235,6 @@
return genInstanceInfoDocument(iid);
}
-
-
public ScopeInfoDocument getScopeInfo(String siid) {
return getScopeInfoWithActivity(siid, false);
}
@@ -219,12 +243,11 @@
return genScopeInfoDocument(siid, includeActivityInfo);
}
- public VariableInfoDocument getVariableInfo(final String scopeId, final
String varName)
- throws ManagementException {
+ public VariableInfoDocument getVariableInfo(final String scopeId, final
String varName) throws ManagementException {
VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance();
final TVariableInfo vinf = ret.addNewVariableInfo();
final TVariableRef sref = vinf.addNewSelf();
- dbexec(new BpelDatabase.Callable<Object>() {
+ dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
ScopeDAO scope = session.getScope(new Long(scopeId));
if (scope == null) {
@@ -233,7 +256,7 @@
sref.setSiid(scopeId);
sref.setIid(scope.getProcessInstance().getInstanceId().toString());
- sref.setName(varName );
+ sref.setName(varName);
XmlDataDAO var = scope.getVariable(varName);
if (var == null) {
@@ -243,8 +266,7 @@
Node nval = var.get();
if (nval != null) {
TVariableInfo.Value val = vinf.addNewValue();
- val.getDomNode().appendChild(
-
val.getDomNode().getOwnerDocument().importNode(nval,true));
+
val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval,
true));
}
return null;
}
@@ -260,7 +282,6 @@
return genInstanceInfoDocument(iid);
}
-
public InstanceInfoDocument resume(final Long iid) {
// We need debugger support in order to resume (since we have to force
// a reduction. If one is not available the getDebugger() method should
@@ -270,8 +291,7 @@
return genInstanceInfoDocument(iid);
}
- public InstanceInfoDocument suspend(final Long iid)
- throws ManagementException {
+ public InstanceInfoDocument suspend(final Long iid) throws
ManagementException {
DebuggerSupport debugSupport = getDebugger(iid);
assert debugSupport != null : "getDebugger(Long) returned NULL!";
debugSupport.suspend(iid);
@@ -294,9 +314,10 @@
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
return null;
- for (ActivityRecoveryDAO recovery:
instance.getActivityRecoveries()) {
+ for (ActivityRecoveryDAO recovery :
instance.getActivityRecoveries()) {
if (recovery.getActivityId() == aid) {
- BpelProcess process =
_server._engine._activeProcesses.get(instance.getProcess().getProcessId());
+ BpelProcess process =
_server._engine._activeProcesses.get(instance.getProcess()
+ .getProcessId());
if (process != null) {
process.recoverActivity(instance,
recovery.getChannel(), aid, action, null);
break;
@@ -321,14 +342,14 @@
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances =
conn.instanceQuery(instanceFilter);
- for (ProcessInstanceDAO instance :instances) {
+ for (ProcessInstanceDAO instance : instances) {
instance.delete();
}
return null;
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception during instance
deletion",e);
+ throw new ProcessingException("Exception during instance
deletion", e);
}
return ret;
@@ -338,17 +359,17 @@
// EVENT RETRIEVAL
//
public List<String> getEventTimeline(String instanceFilter, String
eventFilter) {
- final InstanceFilter ifilter = new
InstanceFilter(instanceFilter,null,0);
- final BpelEventFilter efilter = new BpelEventFilter(eventFilter,0);
+ final InstanceFilter ifilter = new InstanceFilter(instanceFilter,
null, 0);
+ final BpelEventFilter efilter = new BpelEventFilter(eventFilter, 0);
- List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() {
+ List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() {
public List<Date> run(BpelDAOConnection session) throws Exception {
return session.bpelEventTimelineQuery(ifilter, efilter);
}
});
ArrayList<String> ret = new ArrayList<String>(tline.size());
- CollectionsX.transform(ret,tline,new UnaryFunction<Date,String>() {
+ CollectionsX.transform(ret, tline, new UnaryFunction<Date, String>() {
public String apply(Date x) {
return ISO8601DateParser.format(x);
}
@@ -357,11 +378,11 @@
}
public EventInfoListDocument listEvents(String instanceFilter, String
eventFilter, int maxCount) {
- final InstanceFilter ifilter = new
InstanceFilter(instanceFilter,null,0);
- final BpelEventFilter efilter = new
BpelEventFilter(eventFilter,maxCount);
+ final InstanceFilter ifilter = new InstanceFilter(instanceFilter,
null, 0);
+ final BpelEventFilter efilter = new BpelEventFilter(eventFilter,
maxCount);
EventInfoListDocument eid =
EventInfoListDocument.Factory.newInstance();
final TEventInfoList eil = eid.addNewEventInfoList();
- dbexec(new BpelDatabase.Callable<Object>() {
+ dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
List<BpelEvent> events = session.bpelEventQuery(ifilter,
efilter);
for (BpelEvent event : events) {
@@ -384,11 +405,11 @@
if (obase != null && obase.debugInfo != null &&
obase.debugInfo.extensibilityElements != null) {
for (Map.Entry<QName, Object> entry :
obase.debugInfo.extensibilityElements.entrySet()) {
TActivityExtInfo taei = taeil.addNewActivityExtInfo();
- taei.setAiid(""+aid);
+ taei.setAiid("" + aid);
Object extValue = entry.getValue();
if (extValue instanceof Element)
- taei.getDomNode().appendChild(taei.getDomNode()
- .getOwnerDocument().importNode((Element)
extValue, true));
+ taei.getDomNode().appendChild(
+
taei.getDomNode().getOwnerDocument().importNode((Element) extValue, true));
else if (extValue instanceof String) {
Element valueElmt =
taei.getDomNode().getOwnerDocument().createElementNS(
entry.getKey().getNamespaceURI(),
entry.getKey().getLocalPart());
@@ -402,10 +423,12 @@
}
/**
- * Get the [EMAIL PROTECTED] DebuggerSupport} object for the given process
identifier. Debugger
- * support is required for operations that resume execution in some way or
manipulate
- * the breakpoints.
- * @param procid process identifier
+ * Get the [EMAIL PROTECTED] DebuggerSupport} object for the given process
identifier.
+ * Debugger support is required for operations that resume execution in
some
+ * way or manipulate the breakpoints.
+ *
+ * @param procid
+ * process identifier
* @return associated debugger support object
* @throws ManagementException
*/
@@ -413,17 +436,18 @@
BpelProcess process = _server._engine._activeProcesses.get(procid);
if (process == null)
- throw new InvalidRequestException("The process \"" + procid + "\"
is available." );
+ throw new InvalidRequestException("The process \"" + procid + "\"
is available.");
return process._debugger;
}
-
/**
- * Get the [EMAIL PROTECTED] DebuggerSupport} object for the given
instance identifier. Debugger
- * support is required for operations that resume execution in some way or
manipulate
- * the breakpoints.
- * @param iid instance identifier
+ * Get the [EMAIL PROTECTED] DebuggerSupport} object for the given
instance identifier.
+ * Debugger support is required for operations that resume execution in
some
+ * way or manipulate the breakpoints.
+ *
+ * @param iid
+ * instance identifier
* @return associated debugger support object
* @throws ManagementException
*/
@@ -445,8 +469,11 @@
}
/**
- * Execute a database transaction, unwrapping nested [EMAIL PROTECTED]
ManagementException}s.
- * @param runnable action to run
+ * Execute a database transaction, unwrapping nested
+ * [EMAIL PROTECTED] ManagementException}s.
+ *
+ * @param runnable
+ * action to run
* @return
* @throws ManagementException
*/
@@ -461,8 +488,11 @@
}
/**
- * Execute a database transaction, unwrapping nested [EMAIL PROTECTED]
ManagementException}s.
- * @param callable action to run
+ * Execute a database transaction, unwrapping nested
+ * [EMAIL PROTECTED] ManagementException}s.
+ *
+ * @param callable
+ * action to run
* @return
* @throws ManagementException
*/
@@ -473,7 +503,7 @@
// Passthrough.
throw me;
} catch (Exception ex) {
- throw new ManagementException("Exception during database
operation",ex);
+ throw new ManagementException("Exception during database
operation", ex);
}
}
@@ -482,19 +512,14 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) {
- ProcessDAO proc = conn.getProcess(procid);
- if (proc == null)
- throw new InvalidRequestException("ProcessNotFound:" +
procid);
- fillProcessInfo(pi, proc, custom);
- return null;
- }
- });
+ ProcessConf pconf = _store.getProcessConfiguration(procid);
+ if (pconf == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" +
procid);
+ fillProcessInfo(pi, pconf, custom);
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while retrieving process
information",e);
+ throw new ProcessingException("Exception while retrieving process
information", e);
}
return ret;
@@ -503,7 +528,9 @@
/**
* Generate a [EMAIL PROTECTED] InstanceInfoDocument} for a given
instance. This
* document contains general information about the instance.
- * @param iid instance identifier
+ *
+ * @param iid
+ * instance identifier
* @return generated document
*/
private InstanceInfoDocument genInstanceInfoDocument(final Long iid) {
@@ -521,18 +548,19 @@
if (instance == null)
throw new InstanceNotFoundException("" + iid);
// TODO: deal with "ERROR" state information.
- fillInstanceInfo(ii,instance);
+ fillInstanceInfo(ii, instance);
return null;
}
});
-
return ret;
}
/**
* Generate a [EMAIL PROTECTED] ScopeInfoDocument} for a given scope
instance.
- * @param siid scope instance identifier
+ *
+ * @param siid
+ * scope instance identifier
* @param includeActivityInfo
* @return generated document
*/
@@ -557,7 +585,7 @@
if (instance == null)
throw new InvalidRequestException("Scope not found: " +
siidl);
// TODO: deal with "ERROR" state information.
- fillScopeInfo(ii,instance,includeActivityInfo);
+ fillScopeInfo(ii, instance, includeActivityInfo);
return null;
}
});
@@ -566,36 +594,45 @@
/**
* Fill in the <code>process-info</code> element of the transfer object.
- * @param info destination XMLBean
- * @param proc source DAO object
- * @param custom used to customize the quantity of information produced in
the info
- */
- private void fillProcessInfo(TProcessInfo info, ProcessDAO proc,
ProcessInfoCustomizer custom) {
- ProcessConf pconf =
_store.getProcessConfiguration(proc.getProcessId());
- info.setPid(proc.getProcessId().toString());
+ *
+ * @param info
+ * destination XMLBean
+ * @param pconf
+ * process configuration object (from store)
+ * @param proc
+ * source DAO object
+ * @param custom
+ * used to customize the quantity of information produced in the
+ * info
+ */
+ private void fillProcessInfo(TProcessInfo info, ProcessConf pconf,
ProcessInfoCustomizer custom) {
+ if (pconf == null)
+ throw new IllegalArgumentException("Null pconf.");
+
+ info.setPid(pconf.getProcessId().toString());
// TODO: ACTIVE and RETIRED should be used separately.
- //Active process may be retired at the same time
- if(pconf.getState() == ProcessState.RETIRED) {
+ // Active process may be retired at the same time
+ if (pconf.getState() == ProcessState.RETIRED) {
info.setStatus(TProcessStatus.RETIRED);
} else {
info.setStatus(TProcessStatus.ACTIVE);
}
TDefinitionInfo definfo = info.addNewDefinitionInfo();
- definfo.setProcessName(proc.getType());
+ definfo.setProcessName(pconf.getType());
TDeploymentInfo depinfo = info.addNewDeploymentInfo();
depinfo.setDeployDate(toCalendar(pconf.getDeployDate()));
depinfo.setDeployer(pconf.getDeployer());
if (custom.includeInstanceSummary()) {
TInstanceSummary isum = info.addNewInstanceSummary();
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.ACTIVE, proc);
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.COMPLETED, proc);
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.ERROR, proc);
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.FAILED, proc);
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.SUSPENDED, proc);
-
genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.TERMINATED,
proc);
- getInstanceSummaryActivityFailure(isum, proc);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.ACTIVE, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.COMPLETED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.ERROR, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.FAILED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.SUSPENDED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(),
TInstanceStatus.TERMINATED, pconf);
+ getInstanceSummaryActivityFailure(isum, pconf);
}
TProcessInfo.Documents docinfo = info.addNewDocuments();
@@ -617,15 +654,16 @@
}
}
- OProcess oprocess = _server._engine.getOProcess(proc.getProcessId());
+ OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId());
if (custom.includeEndpoints() && oprocess != null) {
TEndpointReferences eprs = info.addNewEndpoints();
for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
if (oplink.hasPartnerRole() && oplink.initializePartnerRole) {
- EndpointReference pepr =
_server._engine._activeProcesses.get(proc.getProcessId())
+ // TODO: this is very uncool.
+ EndpointReference pepr =
_server._engine._activeProcesses.get(pconf.getProcessId())
.getInitialPartnerRoleEPR(oplink);
-
- if (pepr!= null) {
+
+ if (pepr != null) {
TEndpointReferences.EndpointRef epr =
eprs.addNewEndpointRef();
Document eprNodeDoc =
epr.getDomNode().getOwnerDocument();
epr.getDomNode().appendChild(eprNodeDoc.importNode(pepr.toXML().getDocumentElement(),
true));
@@ -634,16 +672,20 @@
}
}
- //TODO: add documents to the above data structure.
+ // TODO: add documents to the above data structure.
}
/**
* Generate document information elements for a set of files.
- * @param docinfo target element
- * @param files files
- * @param recurse recurse down directories?
+ *
+ * @param docinfo
+ * target element
+ * @param files
+ * files
+ * @param recurse
+ * recurse down directories?
*/
- private void genDocumentInfo(TProcessInfo.Documents docinfo, File[]
files,boolean recurse) {
+ private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files,
boolean recurse) {
if (files == null)
return;
for (File f : files) {
@@ -654,7 +696,7 @@
if (recurse)
genDocumentInfo(docinfo, f.listFiles(), true);
} else if (f.isFile()) {
- genDocumentInfo(docinfo,f);
+ genDocumentInfo(docinfo, f);
}
}
}
@@ -670,42 +712,54 @@
}
}
- private void genInstanceSummaryEntry(TInstanceSummary.Instances instances,
TInstanceStatus.Enum state, ProcessDAO proc) {
+ private void genInstanceSummaryEntry(TInstanceSummary.Instances instances,
TInstanceStatus.Enum state,
+ ProcessConf pconf) {
instances.setState(state);
String queryStatus =
InstanceFilter.StatusKeys.valueOf(state.toString()).toString().toLowerCase();
- InstanceFilter instanceFilter = new InstanceFilter("status=" +
queryStatus
- + " name=" + proc.getType().getLocalPart()
- + " namespace=" + proc.getType().getNamespaceURI());
- int count = _db.getConnection().instanceQuery(instanceFilter).size();
+ final InstanceFilter instanceFilter = new InstanceFilter("status=" +
queryStatus + " name="
+ + pconf.getProcessId().getLocalPart() + " namespace=" +
pconf.getProcessId().getNamespaceURI());
+ int count = dbexec(new BpelDatabase.Callable<Integer>() {
+
+ public Integer run(BpelDAOConnection conn) throws Exception {
+ return conn.instanceQuery(instanceFilter).size();
+ }
+ });
instances.setCount(count);
}
- private void getInstanceSummaryActivityFailure(TInstanceSummary summary,
ProcessDAO proc) {
- String queryStatus =
InstanceFilter.StatusKeys.valueOf(TInstanceStatus.ACTIVE.toString()).toString().toLowerCase();
- InstanceFilter instanceFilter = new InstanceFilter("status=" +
queryStatus
- + " name=" + proc.getType().getLocalPart()
- + " namespace=" + proc.getType().getNamespaceURI());
- int failureInstances = 0;
- Date lastFailureDt = null;
- for (ProcessInstanceDAO instance :
_db.getConnection().instanceQuery(instanceFilter)) {
- int count = instance.getActivityFailureCount();
- if (count > 0) {
- ++failureInstances;
- Date failureDt = instance.getActivityFailureDateTime();
- if (lastFailureDt == null || lastFailureDt.before(failureDt))
- lastFailureDt = failureDt;
- }
- }
- if (failureInstances > 0) {
- TFailuresInfo failures = summary.addNewFailures();
- failures.setDtFailure(toCalendar(lastFailureDt));
- failures.setCount(failureInstances);
- }
+ private void getInstanceSummaryActivityFailure(final TInstanceSummary
summary, ProcessConf pconf) {
+ String queryStatus =
InstanceFilter.StatusKeys.valueOf(TInstanceStatus.ACTIVE.toString()).toString()
+ .toLowerCase();
+ final InstanceFilter instanceFilter = new InstanceFilter("status=" +
queryStatus + " name="
+ + pconf.getProcessId().getLocalPart() + " namespace=" +
pconf.getProcessId().getNamespaceURI());
+ dbexec(new BpelDatabase.Callable<Void>() {
+
+ public Void run(BpelDAOConnection conn) throws Exception {
+ Date lastFailureDt = null;
+ int failureInstances = 0;
+ for (ProcessInstanceDAO instance :
conn.instanceQuery(instanceFilter)) {
+ int count = instance.getActivityFailureCount();
+ if (count > 0) {
+ ++failureInstances;
+ Date failureDt = instance.getActivityFailureDateTime();
+ if (lastFailureDt == null ||
lastFailureDt.before(failureDt))
+ lastFailureDt = failureDt;
+ }
+ }
+ if (failureInstances > 0) {
+ TFailuresInfo failures = summary.addNewFailures();
+ failures.setDtFailure(toCalendar(lastFailureDt));
+ failures.setCount(failureInstances);
+ }
+
+ return null;
+ }
+
+ });
}
private void fillInstanceInfo(TInstanceInfo info, ProcessInstanceDAO
instance) {
info.setIid("" + instance.getInstanceId());
- // TODO: add process QName to instance-info schema
ProcessDAO processDAO = instance.getProcess();
info.setPid(processDAO.getProcessId().toString());
info.setProcessName(processDAO.getType());
@@ -731,7 +785,7 @@
for (CorrelationSetDAO correlationSetDAO :
instance.getCorrelationSets()) {
for (Map.Entry<QName, String> property :
correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty =
corrProperties.addNewCorrelationProperty();
-
tproperty.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ tproperty.setCsetid("" +
correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
@@ -743,9 +797,9 @@
eventInfo.setCount(flc.count);
if (instance.getActivityFailureCount() > 0) {
- TFailuresInfo failures = info.addNewFailures();
-
failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime()));
- failures.setCount(instance.getActivityFailureCount());
+ TFailuresInfo failures = info.addNewFailures();
+
failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime()));
+ failures.setCount(instance.getActivityFailureCount());
}
}
@@ -769,13 +823,12 @@
if (!scope.getCorrelationSets().isEmpty()) {
TScopeInfo.CorrelationSets correlationSets =
scopeInfo.addNewCorrelationSets();
for (CorrelationSetDAO correlationSetDAO :
scope.getCorrelationSets()) {
- TScopeInfo.CorrelationSets.CorrelationSet correlationSet =
- correlationSets.addNewCorrelationSet();
-
correlationSet.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ TScopeInfo.CorrelationSets.CorrelationSet correlationSet =
correlationSets.addNewCorrelationSet();
+ correlationSet.setCsetid("" +
correlationSetDAO.getCorrelationSetId());
correlationSet.setName(correlationSetDAO.getName());
for (Map.Entry<QName, String> property :
correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty =
correlationSet.addNewCorrelationProperty();
-
tproperty.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ tproperty.setCsetid("" +
correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
@@ -784,22 +837,23 @@
}
if (includeActivityInfo) {
- Collection<ActivityRecoveryDAO> recoveries =
scope.getProcessInstance().getActivityRecoveries();
+ Collection<ActivityRecoveryDAO> recoveries =
scope.getProcessInstance().getActivityRecoveries();
TScopeInfo.Activities activities = scopeInfo.addNewActivities();
List<BpelEvent> events = scope.listEvents(null);
ActivityStateDocumentBuilder b = new
ActivityStateDocumentBuilder();
- for (BpelEvent e : events) b.onEvent(e);
+ for (BpelEvent e : events)
+ b.onEvent(e);
for (ActivityInfoDocument ai : b.getActivities()) {
for (ActivityRecoveryDAO recovery : recoveries) {
- if
(String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid()))
{
- TFailureInfo failure =
ai.getActivityInfo().addNewFailure();
- failure.setReason(recovery.getReason());
- failure.setDtFailure(toCalendar(recovery.getDateTime()));
- failure.setActions(recovery.getActions());
- failure.setRetries(recovery.getRetries());
- ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
- }
+ if
(String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid()))
{
+ TFailureInfo failure =
ai.getActivityInfo().addNewFailure();
+ failure.setReason(recovery.getReason());
+
failure.setDtFailure(toCalendar(recovery.getDateTime()));
+ failure.setActions(recovery.getActions());
+ failure.setRetries(recovery.getRetries());
+
ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
+ }
}
activities.addNewActivityInfo().set(ai.getActivityInfo());
}
@@ -847,87 +901,89 @@
info.setLineNumber(event.getLineNo());
info.setTimestamp(toCalendar(event.getTimestamp()));
if (event instanceof ActivityEvent) {
- info.setActivityName(((ActivityEvent)event).getActivityName());
- info.setActivityId(((ActivityEvent)event).getActivityId());
- info.setActivityType(((ActivityEvent)event).getActivityType());
-
info.setActivityDefinitionId(((ActivityEvent)event).getActivityDeclarationId());
+ info.setActivityName(((ActivityEvent) event).getActivityName());
+ info.setActivityId(((ActivityEvent) event).getActivityId());
+ info.setActivityType(((ActivityEvent) event).getActivityType());
+ info.setActivityDefinitionId(((ActivityEvent)
event).getActivityDeclarationId());
}
if (event instanceof CorrelationEvent) {
- info.setPortType(((CorrelationEvent)event).getPortType());
- info.setOperation(((CorrelationEvent)event).getOperation());
- info.setMexId(((CorrelationEvent)event).getMessageExchangeId());
+ info.setPortType(((CorrelationEvent) event).getPortType());
+ info.setOperation(((CorrelationEvent) event).getOperation());
+ info.setMexId(((CorrelationEvent) event).getMessageExchangeId());
}
if (event instanceof CorrelationMatchEvent) {
- info.setPortType(((CorrelationMatchEvent)event).getPortType());
+ info.setPortType(((CorrelationMatchEvent) event).getPortType());
}
if (event instanceof CorrelationSetEvent) {
-
info.setCorrelationSet(((CorrelationSetEvent)event).getCorrelationSetName());
+ info.setCorrelationSet(((CorrelationSetEvent)
event).getCorrelationSetName());
}
if (event instanceof CorrelationSetWriteEvent) {
-
info.setCorrelationKey(((CorrelationSetWriteEvent)event).getCorrelationSetName());
+ info.setCorrelationKey(((CorrelationSetWriteEvent)
event).getCorrelationSetName());
}
if (event instanceof ExpressionEvaluationEvent) {
-
info.setExpression(((ExpressionEvaluationEvent)event).getExpression());
+ info.setExpression(((ExpressionEvaluationEvent)
event).getExpression());
}
if (event instanceof ExpressionEvaluationFailedEvent) {
- info.setFault(((ExpressionEvaluationFailedEvent)event).getFault());
+ info.setFault(((ExpressionEvaluationFailedEvent)
event).getFault());
}
if (event instanceof NewProcessInstanceEvent) {
- if ((((NewProcessInstanceEvent)event).getRootScopeId()) != null)
-
info.setRootScopeId(((NewProcessInstanceEvent)event).getRootScopeId());
-
info.setScopeDefinitionId(((NewProcessInstanceEvent)event).getScopeDeclarationId());
+ if ((((NewProcessInstanceEvent) event).getRootScopeId()) != null)
+ info.setRootScopeId(((NewProcessInstanceEvent)
event).getRootScopeId());
+ info.setScopeDefinitionId(((NewProcessInstanceEvent)
event).getScopeDeclarationId());
}
if (event instanceof PartnerLinkEvent) {
- info.setPartnerLinkName(((PartnerLinkEvent)event).getpLinkName());
+ info.setPartnerLinkName(((PartnerLinkEvent) event).getpLinkName());
}
if (event instanceof ProcessCompletionEvent) {
- info.setFault(((ProcessCompletionEvent)event).getFault());
+ info.setFault(((ProcessCompletionEvent) event).getFault());
}
if (event instanceof ProcessEvent) {
- info.setProcessId(((ProcessEvent)event).getProcessId());
- info.setProcessType(((ProcessEvent)event).getProcessName());
+ info.setProcessId(((ProcessEvent) event).getProcessId());
+ info.setProcessType(((ProcessEvent) event).getProcessName());
}
if (event instanceof ProcessInstanceEvent) {
-
info.setInstanceId(((ProcessInstanceEvent)event).getProcessInstanceId());
+ info.setInstanceId(((ProcessInstanceEvent)
event).getProcessInstanceId());
}
if (event instanceof ProcessInstanceStartedEvent) {
-
info.setRootScopeId(((ProcessInstanceStartedEvent)event).getRootScopeId());
-
info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent)event).getScopeDeclarationId());
+ info.setRootScopeId(((ProcessInstanceStartedEvent)
event).getRootScopeId());
+ info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent)
event).getScopeDeclarationId());
}
if (event instanceof ProcessInstanceStateChangeEvent) {
-
info.setOldState(((ProcessInstanceStateChangeEvent)event).getOldState());
-
info.setNewState(((ProcessInstanceStateChangeEvent)event).getNewState());
+ info.setOldState(((ProcessInstanceStateChangeEvent)
event).getOldState());
+ info.setNewState(((ProcessInstanceStateChangeEvent)
event).getNewState());
}
if (event instanceof ProcessMessageExchangeEvent) {
-
info.setPortType(((ProcessMessageExchangeEvent)event).getPortType());
-
info.setOperation(((ProcessMessageExchangeEvent)event).getOperation());
-
info.setMexId(((ProcessMessageExchangeEvent)event).getMessageExchangeId());
+ info.setPortType(((ProcessMessageExchangeEvent)
event).getPortType());
+ info.setOperation(((ProcessMessageExchangeEvent)
event).getOperation());
+ info.setMexId(((ProcessMessageExchangeEvent)
event).getMessageExchangeId());
}
if (event instanceof ScopeCompletionEvent) {
- info.setSuccess(((ScopeCompletionEvent)event).isSuccess());
- info.setFault(((ScopeCompletionEvent)event).getFault());
+ info.setSuccess(((ScopeCompletionEvent) event).isSuccess());
+ info.setFault(((ScopeCompletionEvent) event).getFault());
}
if (event instanceof ScopeEvent) {
- info.setScopeId(((ScopeEvent)event).getScopeId());
- if (((ScopeEvent)event).getParentScopeId() != null)
- info.setParentScopeId(((ScopeEvent)event).getParentScopeId());
- if (((ScopeEvent)event).getScopeName() != null)
- info.setScopeName(((ScopeEvent)event).getScopeName());
-
info.setScopeDefinitionId(((ScopeEvent)event).getScopeDeclarationId());
+ info.setScopeId(((ScopeEvent) event).getScopeId());
+ if (((ScopeEvent) event).getParentScopeId() != null)
+ info.setParentScopeId(((ScopeEvent) event).getParentScopeId());
+ if (((ScopeEvent) event).getScopeName() != null)
+ info.setScopeName(((ScopeEvent) event).getScopeName());
+ info.setScopeDefinitionId(((ScopeEvent)
event).getScopeDeclarationId());
}
if (event instanceof ScopeFaultEvent) {
- info.setFault(((ScopeFaultEvent)event).getFaultType());
- info.setFaultLineNumber(((ScopeFaultEvent)event).getFaultLineNo());
- info.setExplanation(((ScopeFaultEvent)event).getExplanation());
+ info.setFault(((ScopeFaultEvent) event).getFaultType());
+ info.setFaultLineNumber(((ScopeFaultEvent)
event).getFaultLineNo());
+ info.setExplanation(((ScopeFaultEvent) event).getExplanation());
}
if (event instanceof VariableEvent) {
- info.setVariableName(((VariableEvent)event).getVarName());
+ info.setVariableName(((VariableEvent) event).getVarName());
}
}
/**
* Convert a [EMAIL PROTECTED] Date} to a [EMAIL PROTECTED] Calendar}.
- * @param dtime a [EMAIL PROTECTED] Date}
+ *
+ * @param dtime
+ * a [EMAIL PROTECTED] Date}
* @return a [EMAIL PROTECTED] Calendar}
*/
private Calendar toCalendar(Date dtime) {
@@ -939,7 +995,6 @@
return c;
}
-
/**
* @see
org.apache.ode.bpel.pmapi.InstanceManagement#queryInstances(java.lang.String)
*/
@@ -958,10 +1013,155 @@
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception while querying
instances",e);
+ throw new ProcessingException("Exception while querying
instances", e);
}
return ret;
}
+ /**
+ * Query processes based on a [EMAIL PROTECTED] ProcessFilter} criteria.
This is
+ * implemented in memory rather than via database calls since the processes
+ * are managed by the [EMAIL PROTECTED] ProcessStore} object and we don't
want to make
+ * this needlessly complicated.
+ *
+ * @param filter
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ Collection<ProcessConf> processQuery(ProcessFilter filter) {
+
+ List<QName> pids = _store.getProcesses();
+
+ // Name filter can be implemented using only the PIDs.
+ if (filter != null && filter.getNameFilter() != null) {
+ final Pattern pattern = Pattern.compile(filter.getNameFilter());
+ CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
+ @Override
+ public boolean isMember(QName o) {
+ return !pattern.matcher(o.getLocalPart()).matches();
+ }
+ });
+ }
+
+ if (filter != null && filter.getNamespaceFilter() != null) {
+ final Pattern pattern = Pattern.compile(filter.getNameFilter());
+ CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
+ @Override
+ public boolean isMember(QName o) {
+ String ns = o.getNamespaceURI() == null ? "" :
o.getNamespaceURI();
+ return !pattern.matcher(ns).matches();
+ }
+
+ });
+ }
+
+ // Now we need the process conf objects, we need to be
+ // careful since someone could have deleted them by now
+ List<ProcessConf> confs = new LinkedList<ProcessConf>();
+ for (QName pid : pids) {
+ ProcessConf pconf = _store.getProcessConfiguration(pid);
+ confs.add(pconf);
+ }
+
+ if (filter != null) {
+ // TODO Implement process status filtering when status will exist
+ // Specific filter for deployment date.
+ if (filter.getDeployedDateFilter() != null) {
+ for (final String ddf : filter.getDeployedDateFilter()) {
+ final Date dd;
+ try {
+ dd =
ISO8601DateParser.parse(Filter.getDateWithoutOp(ddf));
+ } catch (ParseException e) {
+ // Should never happen.
+ throw new RuntimeException(e);
+ }
+
+ CollectionsX.remove_if(confs, new
MemberOfFunction<ProcessConf>() {
+ @Override
+ public boolean isMember(ProcessConf o) {
+
+ if (ddf.startsWith("="))
+ return !o.getDeployDate().equals(dd);
+
+ if (ddf.startsWith("<="))
+ return o.getDeployDate().getTime() >
dd.getTime();
+
+ if (ddf.startsWith(">="))
+ return o.getDeployDate().getTime() <
dd.getTime();
+
+ if (ddf.startsWith("<"))
+ return o.getDeployDate().getTime() >=
dd.getTime();
+
+ if (ddf.startsWith(">"))
+ return o.getDeployDate().getTime() <=
dd.getTime();
+
+ return false;
+ }
+
+ });
+
+ }
+ }
+
+ // Ordering
+ if (filter.getOrders() != null) {
+ ComparatorChain cchain = new ComparatorChain();
+ for (String key : filter.getOrders()) {
+ boolean ascending = true;
+ String orderKey = key;
+ if (key.startsWith("+") || key.startsWith("-")) {
+ orderKey = key.substring(1, key.length());
+ if (key.startsWith("-"))
+ ascending = false;
+ }
+
+ Comparator c;
+ if ("name".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2)
{
+ return
o1.getProcessId().getLocalPart().compareTo(o2.getProcessId().getLocalPart());
+ }
+ };
+ else if ("namespace".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2)
{
+ String ns1 =
o1.getProcessId().getNamespaceURI() == null ? "" : o1.getProcessId()
+ .getNamespaceURI();
+ String ns2 =
o2.getProcessId().getNamespaceURI() == null ? "" : o2.getProcessId()
+ .getNamespaceURI();
+ return ns1.compareTo(ns2);
+ }
+ };
+ else if ("version".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2)
{
+ // TODO: implement version comparisons.
+ return 0;
+ }
+ };
+ else if ("deployed".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2)
{
+ return
o1.getDeployDate().compareTo(o2.getDeployDate());
+ }
+
+ };
+
+ else {
+ // unrecognized
+ __log.debug("unrecognized order key" + orderKey);
+ continue;
+ }
+
+ cchain.addComparator(c, !ascending);
+ }
+
+ Collections.sort(confs, cchain);
+ }
+
+ }
+
+ return confs;
+ }
}