Author: mszefler
Date: Wed Aug 30 09:57:27 2006
New Revision: 438543
URL: http://svn.apache.org/viewvc?rev=438543&view=rev
Log:
Fixed interceptor example, so only new instances are intercepted.
Added additional interceptor event for new instances.
Added:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
(with props)
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=438543&r1=438542&r2=438543&view=diff
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Wed Aug 30 09:57:27 2006
@@ -44,6 +44,7 @@
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
+import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
import org.apache.ode.bpel.o.OBase;
@@ -172,7 +173,7 @@
mex.getDAO().setProcess(getProcessDAO());
mex.setProcess(_oprocess);
- if (!processInterceptors(mex)) {
+ if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
__log.debug("Aborting processing of mex " + mex + " due to
interceptors.");
return;
}
@@ -281,39 +282,21 @@
* @return <code>true</code> if execution should continue,
* <code>false</code> otherwise
*/
- private boolean processInterceptors(MyRoleMessageExchangeImpl mex) {
+ private boolean processInterceptors(MyRoleMessageExchangeImpl mex,
InterceptorInvoker invoker) {
InterceptorContextImpl ictx = new
InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO());
for (MessageExchangeInterceptor i : _mexInterceptors)
- if (!processInterceptor(i,mex, ictx))
+ if (!mex.processInterceptor(i,mex, ictx, invoker))
return false;
for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
- if (!processInterceptor(i,mex, ictx))
+ if (!mex.processInterceptor(i,mex, ictx, invoker))
return false;
return true;
}
- private boolean processInterceptor(MessageExchangeInterceptor i,
MyRoleMessageExchangeImpl mex, InterceptorContext ictx) {
- __log.debug("onProcessInvoked --> interceptor " + i);
- try {
- i.onProcessInvoked(mex, ictx);
- } catch (FaultMessageExchangeException fme) {
- __log.debug("interceptor " + i + " caused invoke on " + this + "
to be aborted with FAULT " + fme.getFaultName());
- mex.setFault(fme.getFaultName().getLocalPart(),
fme.getFaultData());
- return false;
- } catch (AbortMessageExchangeException ame) {
- __log.debug("interceptor " + i + " cause invoke on " + this + "
to be aborted with FAILURE: "+ ame.getMessage());
- mex.setFailure(MessageExchange.FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex
- .getMessageExchangeId(), i.toString(),
ame.getMessage()), null);
- return false;
- }
-
- return true;
- }
-
/**
* Replacement object for serializtation of the [EMAIL PROTECTED] OBase}
(compiled
@@ -496,6 +479,12 @@
if (processDAO.isRetired()) {
throw new InvalidProcessException("Process is retired.",
InvalidProcessException.RETIRED_CAUSE_CODE);
}
+
+ if (!processInterceptors(mex,
InterceptorInvoker.__onNewInstanceInvoked)) {
+ __log.debug("Not creating a new instance for mex " +
mex + "; interceptor prevented!");
+ return;
+ }
+
ProcessInstanceDAO newInstance =
processDAO.createInstance(correlator);
BpelRuntimeContextImpl instance =
createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=438543&r1=438542&r2=438543&view=diff
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Wed Aug 30 09:57:27 2006
@@ -19,13 +19,11 @@
package org.apache.ode.bpel.engine;
-
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
@@ -36,121 +34,114 @@
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
+import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements
MyRoleMessageExchange {
+
+ private static final Log __log =
LogFactory.getLog(MyRoleMessageExchangeImpl.class);
+
+ public MyRoleMessageExchangeImpl(BpelEngineImpl engine, MessageExchangeDAO
mexdao) {
+ super(engine, mexdao);
+ }
+
+ public CorrelationStatus getCorrelationStatus() {
+ return CorrelationStatus.valueOf(getDAO().getCorrelationStatus());
+ }
+
+ void setCorrelationStatus(CorrelationStatus status) {
+ getDAO().setCorrelationStatus(status.toString());
+ }
+
+ /**
+ * Process the message-exchange interceptors.
+ *
+ * @param mex
+ * message exchange
+ * @return <code>true</code> if execution should continue,
+ * <code>false</code> otherwise
+ */
+ private boolean processInterceptors(MyRoleMessageExchangeImpl mex,
InterceptorInvoker invoker) {
+ InterceptorContextImpl ictx = new
InterceptorContextImpl(_engine._contexts.dao.getConnection(), null);
+
+ for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
+ if (!processInterceptor(i, mex, ictx, invoker))
+ return false;
+
+ return true;
+ }
+
+ boolean processInterceptor(MessageExchangeInterceptor i,
MyRoleMessageExchangeImpl mex, InterceptorContext ictx,
+ InterceptorInvoker invoker) {
+ __log.debug(invoker + "--> interceptor " + i);
+ try {
+ invoker.invoke(i, mex, ictx);
+ } catch (FaultMessageExchangeException fme) {
+ __log.debug("interceptor " + i + " caused invoke on " + this + "
to be aborted with FAULT " + fme.getFaultName());
+ mex.setFault(fme.getFaultName().getLocalPart(),
fme.getFaultData());
+ return false;
+ } catch (AbortMessageExchangeException ame) {
+ __log.debug("interceptor " + i + " cause invoke on " + this + " to
be aborted with FAILURE: " + ame.getMessage());
+ mex.setFailure(MessageExchange.FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex.getMessageExchangeId(), i
+ .toString(), ame.getMessage()), null);
+ return false;
+ }
+ return true;
+ }
+
+ public void invoke(Message request) {
+ if (request == null) {
+ String errmsg = "Must pass non-null message to invoke()!";
+ __log.fatal(errmsg);
+ throw new NullPointerException(errmsg);
+ }
+
+ _dao.setRequest(((MessageImpl) request)._dao);
+ _dao.setStatus(MessageExchange.Status.REQUEST.toString());
+
+ if (!processInterceptors(this,
InterceptorInvoker.__onBpelServerInvoked))
+ return;
+
+ BpelProcess target = _engine.route(getDAO().getCallee(), request);
+
+ if (__log.isDebugEnabled())
+ __log.debug("invoke() EPR= " + _epr + " ==> " + target);
+
+ if (target == null) {
+ if (__log.isWarnEnabled())
+ __log.warn(__msgs.msgUnknownEPR("" + _epr));
+
+
setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
+ setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null,
null);
+ } else {
+ target.invokeProcess(this);
+ }
+
+ }
+
+ public void complete() {
+ }
+
+ public QName getServiceName() {
+ return getDAO().getCallee();
+ }
+
+ public void setClientId(String clientKey) {
+ getDAO().setCorrelationId(clientKey);
+ }
+
+ public String getClientId() {
+ return getDAO().getCorrelationId();
+ }
-class MyRoleMessageExchangeImpl extends MessageExchangeImpl
- implements MyRoleMessageExchange {
-
- private static final Log __log =
LogFactory.getLog(MyRoleMessageExchangeImpl.class);
-
- public MyRoleMessageExchangeImpl(
- BpelEngineImpl engine,
- MessageExchangeDAO mexdao) {
- super(engine, mexdao);
- }
-
-
- public CorrelationStatus getCorrelationStatus() {
- return CorrelationStatus.valueOf(getDAO().getCorrelationStatus());
- }
-
-
- void setCorrelationStatus(CorrelationStatus status) {
- getDAO().setCorrelationStatus(status.toString());
- }
-
- /**
- * Process the message-exchange interceptors.
- *
- * @param mex
- * message exchange
- * @return <code>true</code> if execution should continue,
- * <code>false</code> otherwise
- */
- private boolean processInterceptors(MyRoleMessageExchangeImpl mex) {
- InterceptorContextImpl ictx = new
InterceptorContextImpl(_engine._contexts.dao.getConnection(), null);
-
- for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
- if (!processInterceptor(i, mex, ictx))
- return false;
-
-
- return true;
- }
-
- private boolean processInterceptor(MessageExchangeInterceptor i,
MyRoleMessageExchangeImpl mex, InterceptorContext ictx) {
- __log.debug("onBpelServerInvoked --> interceptor " + i);
- try {
- i.onBpelServerInvoked(mex, ictx);
- } catch (FaultMessageExchangeException fme) {
- __log.debug("interceptor " + i + " caused invoke on " + this + " to
be aborted with FAULT " + fme.getFaultName());
- mex.setFault(fme.getFaultName().getLocalPart(), fme.getFaultData());
- return false;
- } catch (AbortMessageExchangeException ame) {
- __log.debug("interceptor " + i + " cause invoke on " + this + " to be
aborted with FAILURE: "+ ame.getMessage());
- mex.setFailure(MessageExchange.FailureType.ABORTED,
__msgs.msgInterceptorAborted(mex
- .getMessageExchangeId(), i.toString(),
ame.getMessage()), null);
- return false;
- }
- return true;
- }
-
- public void invoke(Message request) {
- if (request == null) {
- String errmsg = "Must pass non-null message to invoke()!";
- __log.fatal(errmsg);
- throw new NullPointerException(errmsg);
- }
-
- _dao.setRequest(((MessageImpl)request)._dao);
- _dao.setStatus(MessageExchange.Status.REQUEST.toString());
-
- if (!processInterceptors(this))
- return;
-
- BpelProcess target = _engine.route(getDAO().getCallee(), request);
-
- if (__log.isDebugEnabled())
- __log.debug("invoke() EPR= " + _epr + " ==> " + target);
-
-
- if (target == null) {
- if (__log.isWarnEnabled())
- __log.warn(__msgs.msgUnknownEPR("" + _epr));
-
-
setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.UKNOWN_ENDPOINT);
- setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, null,null);
- } else {
- target.invokeProcess(this);
- }
-
- }
-
- public void complete() {
- }
-
- public QName getServiceName() {
- return getDAO().getCallee();
- }
-
- public void setClientId(String clientKey) {
- getDAO().setCorrelationId(clientKey);
- }
-
-
- public String getClientId() {
- return getDAO().getCorrelationId();
- }
-
- public String toString() {
- try {
- return "{MyRoleMex#" + getMessageExchangeId()
- + " [Client " + getClientId() + "] calling " + getServiceName() + "." +
getOperationName() + "(...)}";
- } catch (Throwable t) {
- return "{MyRoleMex#???}";
+ public String toString() {
+ try {
+ return "{MyRoleMex#" + getMessageExchangeId() + " [Client " +
getClientId() + "] calling " + getServiceName() + "."
+ + getOperationName() + "(...)}";
+ } catch (Throwable t) {
+ return "{MyRoleMex#???}";
+ }
}
- }
}
Added:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java?rev=438543&view=auto
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
(added)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
Wed Aug 30 09:57:27 2006
@@ -0,0 +1,62 @@
+package org.apache.ode.bpel.intercept;
+
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+
+/**
+ * Helper for invoking the appropriate [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
+ * method -- basically a work-around for lack of closures.
+ * @author mszefler
+ *
+ */
+public abstract class InterceptorInvoker {
+
+ private final String _name;
+ // Closures anyone?
+
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange,
InterceptorContext)} */
+ public static final InterceptorInvoker __onProcessInvoked= new
InterceptorInvoker("onProcessInvoked") {
+ public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ throws FailMessageExchangeException,
FaultMessageExchangeException {
+ i.onProcessInvoked((MyRoleMessageExchange) mex, ictx);
+ }
+ };
+
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onBpelServerInvoked(MyRoleMessageExchange,
InterceptorContext)} */
+ public static final InterceptorInvoker __onBpelServerInvoked = new
InterceptorInvoker("onBpelServerInvoked") {
+ public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ throws FailMessageExchangeException,
FaultMessageExchangeException {
+ i.onBpelServerInvoked((MyRoleMessageExchange) mex,
ictx);
+ }
+ };
+
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorContext)} */
+ public static final InterceptorInvoker __onPartnerInvoked = new
InterceptorInvoker("onPartnerInvoked") {
+ public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ throws FailMessageExchangeException,
FaultMessageExchangeException {
+ i.onPartnerInvoked((PartnerRoleMessageExchange) mex,
ictx);
+ }
+ };
+
+ /** Invoke [EMAIL PROTECTED]
MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange,
InterceptorContext)} */
+ public static final InterceptorInvoker __onNewInstanceInvoked = new
InterceptorInvoker("onNewInstanceInvoked") {
+ public void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ throws FailMessageExchangeException,
FaultMessageExchangeException {
+ i.onNewInstanceInvoked((MyRoleMessageExchange) mex,
ictx);
+ }
+ };
+
+
+ private InterceptorInvoker(String name) {
+ _name = name;
+ }
+
+ public abstract void invoke(MessageExchangeInterceptor i,
MessageExchange mex, InterceptorContext ictx)
+ throws FailMessageExchangeException,
FaultMessageExchangeException;
+
+ public String toString() {
+ return InterceptorInvoker.class.getName() + "." + _name;
+ }
+}
Propchange:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?rev=438543&r1=438542&r2=438543&view=diff
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
Wed Aug 30 09:57:27 2006
@@ -53,6 +53,17 @@
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
+ * Called when the BPEL server is invoked, after the message exchange has
+ * been routed to the process and it has been determined that a new
instance
+ * needs to be created.
+ *
+ * @param mex
+ * message exchange
+ */
+ void onNewInstanceInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ throws FailMessageExchangeException, FaultMessageExchangeException;
+
+ /**
* Called when the BPEL server is invoked, before any attempt to route the
* message exchange to a process.
*
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java?rev=438543&r1=438542&r2=438543&view=diff
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
Wed Aug 30 09:57:27 2006
@@ -20,13 +20,15 @@
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import
org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
/**
- * No-Op implementation of the [EMAIL PROTECTED]
org.apache.ode.bpel.intercept.MessageExchangeInterceptor interface;
+ * No-Op implementation of the
+ * [EMAIL PROTECTED] org.apache.ode.bpel.intercept.MessageExchangeInterceptor
interface;
* good for sub-classing.
*
* @author mszefler
- *
+ *
*/
public class NoOpInterceptor implements MessageExchangeInterceptor {
@@ -43,6 +45,12 @@
public void onPartnerInvoked(PartnerRoleMessageExchange mex,
InterceptorContext ic) throws
FailMessageExchangeException,
FaultMessageExchangeException {
+ }
+
+ public void onNewInstanceInvoked(MyRoleMessageExchange mex,
+ InterceptorContext ic) throws
FailMessageExchangeException,
+ FaultMessageExchangeException {
+
}
}
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?rev=438543&r1=438542&r2=438543&view=diff
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
Wed Aug 30 09:57:27 2006
@@ -24,7 +24,6 @@
import org.apache.ode.bpel.dao.ProcessPropertyDAO;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
/**
* An example of a simple interceptor providing a "throttling" capability -
that is an
@@ -37,7 +36,7 @@
private static final QName PROP_MAX_INSTANCES = new
QName("urn:org.apache.ode.bpel.intercept", "maxInstances");
@Override
- public void onProcessInvoked(MyRoleMessageExchange mex,
+ public void onNewInstanceInvoked(MyRoleMessageExchange mex,
InterceptorContext ic) throws
FailMessageExchangeException {
int maxInstances;
try {