Author: gatfora
Date: Thu Mar 1 23:48:10 2007
New Revision: 513646
URL: http://svn.apache.org/viewvc?view=rev&rev=513646
Log:
Check for sequences timed out on the Sender rather than when a message arrives,
Delete terminated RMS/RMDBeans after a configurable period, Terminate RMDBeans
after the Inactivity timeout is reached
Modified:
webservices/sandesha/trunk/java/config/module.xml
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/test-resources/sandesha2.properties
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/PropertyLoaderTest.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/faulttests/SequenceTimedOutTest.java
Modified: webservices/sandesha/trunk/java/config/module.xml
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Thu Mar 1 23:48:10 2007
@@ -93,9 +93,15 @@
<sandesha2:ExponentialBackoff>false</sandesha2:ExponentialBackoff>
<sandesha2:InactivityTimeout>60</sandesha2:InactivityTimeout>
-
+
<sandesha2:InactivityTimeoutMeasure>seconds</sandesha2:InactivityTimeoutMeasure>
-
+
+ <!-- Once a sequence has been marked as deleted, or timed out, this
is the length of time that the
+ sequence will remain before all sequence state is totally
removed -->
+
<sandesha2:SequenceRemovalTimeout>600</sandesha2:SequenceRemovalTimeout>
+
+
<sandesha2:SequenceRemovalTimeoutMeasure>seconds</sandesha2:SequenceRemovalTimeoutMeasure>
+
<sandesha2:InvokeInOrder>true</sandesha2:InvokeInOrder>
<!-- These will not be overriden by service level policies -->
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Thu Mar 1 23:48:10 2007
@@ -369,71 +369,79 @@
public interface Properties {
- String RetransmissionInterval = "RetransmissionInterval";
+ public static final String RetransmissionInterval =
"RetransmissionInterval";
- String AcknowledgementInterval = "AcknowledgementInterval";
+ public static final String AcknowledgementInterval =
"AcknowledgementInterval";
- String ExponentialBackoff = "ExponentialBackoff";
+ public static final String ExponentialBackoff =
"ExponentialBackoff";
- String InactivityTimeout = "InactivityTimeout";
+ public static final String InactivityTimeout =
"InactivityTimeout";
- String InactivityTimeoutMeasure = "InactivityTimeoutMeasure";
+ public static final String InactivityTimeoutMeasure =
"InactivityTimeoutMeasure";
+
+ public static final String SequenceRemovalTimeout =
"SequenceRemovalTimeout";
+ public static final String SequenceRemovalTimeoutMeasure =
"SequenceRemovalTimeoutMeasure";
+
// String StorageManager = "StorageManager";
- String InMemoryStorageManager = "InMemoryStorageManager";
+ public static final String InMemoryStorageManager =
"InMemoryStorageManager";
- String PermanentStorageManager = "PermanentStorageManager";
+ public static final String PermanentStorageManager =
"PermanentStorageManager";
- String InOrderInvocation = "InvokeInOrder";
+ public static final String InOrderInvocation = "InvokeInOrder";
- String MessageTypesToDrop = "MessageTypesToDrop";
+ public static final String MessageTypesToDrop =
"MessageTypesToDrop";
- String RetransmissionCount = "RetransmissionCount";
+ public static final String RetransmissionCount =
"RetransmissionCount";
- String SecurityManager = "SecurityManager";
+ public static final String SecurityManager = "SecurityManager";
- String EnableMakeConnection = "EnableMakeConnection";
+ public static final String EnableMakeConnection =
"EnableMakeConnection";
- String EnableRMAnonURI = "EnableRMAnonURI";
+ public static final String EnableRMAnonURI = "EnableRMAnonURI";
- String UseMessageSerialization = "UseMessageSerialization";
+ public static final String UseMessageSerialization =
"UseMessageSerialization";
public interface DefaultValues {
- int RetransmissionInterval = 6000;
+ public static final int RetransmissionInterval = 6000;
- int AcknowledgementInterval = 3000;
+ public static final int AcknowledgementInterval = 3000;
- boolean ExponentialBackoff = true;
+ public static final boolean ExponentialBackoff = true;
- int InactivityTimeout = -1;
+ public static final int InactivityTimeout = -1;
- String InactivityTimeoutMeasure = "seconds"; //this
can be - seconds,minutes,hours,days
+ public static final String InactivityTimeoutMeasure =
"seconds"; //this can be - seconds,minutes,hours,days
+ public static final int sequenceRemovalTimeout = -1;
+
+ public static final String
sequenceRemovalTimeoutMeasure = "seconds"; //this can be -
seconds,minutes,hours,days
+
// String StorageManager =
"org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
- String InMemoryStorageManager =
"org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
+ public static final String InMemoryStorageManager =
"org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
- String PermanentStorageManager =
"org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
+ public static final String PermanentStorageManager =
"org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
- boolean InvokeInOrder = true;
+ public static final boolean InvokeInOrder = true;
- String MessageTypesToDrop=VALUE_NONE;
+ public static final String
MessageTypesToDrop=VALUE_NONE;
- int RetransmissionCount = 8;
+ public static final int RetransmissionCount = 8;
- int MaximumRetransmissionCount = 10;
+ public static final int MaximumRetransmissionCount = 10;
- String SecurityManager =
"org.apache.sandesha2.security.dummy.DummySecurityManager";
+ public static final String SecurityManager =
"org.apache.sandesha2.security.dummy.DummySecurityManager";
- boolean EnableMakeConnection = true;
+ public static final boolean EnableMakeConnection = true;
- boolean EnableRMAnonURI = true;
+ public static final boolean EnableRMAnonURI = true;
- boolean UseMessageSerialization = false;
+ public static final boolean UseMessageSerialization =
false;
- boolean enforceRM = false;
+ public static final boolean enforceRM = false;
}
}
@@ -551,6 +559,8 @@
public static final String ELEM_EXP_BACKOFF = "ExponentialBackoff";
public static final String ELEM_INACTIVITY_TIMEOUT =
"InactivityTimeout";
public static final String ELEM_INACTIVITY_TIMEOUT_MEASURES =
"InactivityTimeoutMeasure";
+ public static final String ELEM_DELETION_TIMEOUT =
"SequenceRemovalTimeout";
+ public static final String ELEM_DELETION_TIMEOUT_MEASURES =
"SequenceRemovalTimeoutMeasure";
public static final String ELEM_INVOKE_INORDER = "InvokeInOrder";
public static final String ELEM_MSG_TYPES_TO_DROP =
"MessageTypesToDrop";
public static final String ELEM_STORAGE_MGR = "StorageManagers";
@@ -572,6 +582,8 @@
public static final QName Q_ELEM_EXP_BACKOFF = new
QName(URI_RM_POLICY_NS, ELEM_EXP_BACKOFF, ATTR_WSRM);
public static final QName Q_ELEM_INACTIVITY_TIMEOUT = new
QName(URI_RM_POLICY_NS, ELEM_INACTIVITY_TIMEOUT, ATTR_WSRM);
public static final QName Q_ELEM_INACTIVITY_TIMEOUT_MEASURES = new
QName(URI_RM_POLICY_NS, ELEM_INACTIVITY_TIMEOUT_MEASURES, ATTR_WSRM);
+ public static final QName Q_ELEM_SEQUENCE_REMOVAL_TIMEOUT = new
QName(URI_RM_POLICY_NS, ELEM_DELETION_TIMEOUT, ATTR_WSRM);
+ public static final QName Q_ELEM_SEQUENCE_REMOVAL_TIMEOUT_MEASURES =
new QName(URI_RM_POLICY_NS, ELEM_DELETION_TIMEOUT_MEASURES, ATTR_WSRM);
public static final QName Q_ELEM_INVOKE_INORDER = new
QName(URI_RM_POLICY_NS, ELEM_INVOKE_INORDER, ATTR_WSRM);
public static final QName Q_ELEM_MSG_TYPES_TO_DROP = new
QName(URI_RM_POLICY_NS, ELEM_MSG_TYPES_TO_DROP, ATTR_WSRM);
public static final QName Q_ELEM_STORAGE_MGR =new
QName(URI_RM_POLICY_NS, ELEM_STORAGE_MGR, ATTR_WSRM);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Mar 1 23:48:10 2007
@@ -210,6 +210,8 @@
// Store the security token for the offered sequence
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
+
rMDBean.setLastActivatedTime(System.currentTimeMillis());
+
rmdBeanMgr.insert(rMDBean);
SandeshaUtil.startWorkersForSequence(configCtx,
rMDBean);
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Thu Mar 1 23:48:10 2007
@@ -273,6 +273,9 @@
}
}
+ // Set the last activated time
+ bean.setLastActivatedTime(System.currentTimeMillis());
+
// Update the RMD bean
mgr.update(bean);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
Thu Mar 1 23:48:10 2007
@@ -59,6 +59,12 @@
private long inactivityTimeoutInterval = -1;
private boolean inactivityTimeoutIntervalSet = false;
+
+ private long sequenceRemovalTimeoutValue;
+ private String sequenceRemovalTimeoutMeasure;
+
+ private long sequenceRemovalTimeoutInterval = -1;
+ private boolean sequenceRemovalTimeoutIntervalSet = false;
private long acknowledgementInterval;
private boolean acknowledgementIntervalSet = false;
@@ -103,6 +109,25 @@
}
+ public void setSequenceRemovalTimeoutInterval(long value, String
measure) {
+ long timeOut = 0;
+
+ if (measure == null) {
+ this.sequenceRemovalTimeoutInterval = value;
+ } else if ("seconds".equals(measure)) {
+ timeOut = value * 1000;
+ } else if ("minutes".equals(measure)) {
+ timeOut = value * 60 * 1000;
+ } else if ("hours".equals(measure)) {
+ timeOut = value * 60 * 60 * 1000;
+ } else if ("days".equals(measure)) {
+ timeOut = value * 24 * 60 * 60 * 1000;
+ }
+
+ this.sequenceRemovalTimeoutInterval = timeOut;
+
+ }
+
public void setAcknowledgementInterval(long acknowledgementInterval) {
this.acknowledgementInterval = acknowledgementInterval;
setAcknowledgementIntervalSet(true);
@@ -243,7 +268,7 @@
writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_EXP_BACKOFF.getLocalPart(), namespaceURI);
writer.writeCharacters(Boolean.toString(isExponentialBackoff()));
writer.writeEndElement();
-
+
// <wsrm:InactivityTimeout />
writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_INACTIVITY_TIMEOUT.getLocalPart(),
namespaceURI);
writer.writeCharacters(Long.toString(getInactivityTimeoutInterval()));
@@ -253,7 +278,17 @@
writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_INACTIVITY_TIMEOUT_MEASURES.getLocalPart(),
namespaceURI);
writer.writeCharacters(inactivityTimeoutMeasure);
writer.writeEndElement();
+
+ // <wsrm:SequenceRemovalTimeout />
+ writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_SEQUENCE_REMOVAL_TIMEOUT.getLocalPart(),
namespaceURI);
+
writer.writeCharacters(Long.toString(getSequenceRemovalTimeoutInterval()));
+ writer.writeEndElement();
+ // <wsrm:SequenceRemovalTimeoutMeasure />
+ writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_SEQUENCE_REMOVAL_TIMEOUT_MEASURES.getLocalPart(),
namespaceURI);
+ writer.writeCharacters(sequenceRemovalTimeoutMeasure);
+ writer.writeEndElement();
+
// <wsrm:InvokeInOrder />
writer.writeStartElement(prefix,
Sandesha2Constants.Assertions.Q_ELEM_INVOKE_INORDER.getLocalPart(),
namespaceURI);
writer.writeCharacters(Boolean.toString(isInOrder()));
@@ -377,6 +412,14 @@
return inactivityTimeoutInterval;
}
+ public long getSequenceRemovalTimeoutInterval() {
+ if (sequenceRemovalTimeoutInterval < 0)
+ setSequenceRemovalTimeoutInterval(sequenceRemovalTimeoutValue,
+ sequenceRemovalTimeoutMeasure);
+
+ return sequenceRemovalTimeoutInterval;
+ }
+
public void setInactiveTimeoutValue(long inactiveTimeoutValue) {
this.inactiveTimeoutValue = inactiveTimeoutValue;
setInactiveTimeoutValueSet(true);
@@ -385,6 +428,15 @@
public void setInactivityTimeoutMeasure(String inactivityTimeoutMeasure) {
this.inactivityTimeoutMeasure = inactivityTimeoutMeasure;
}
+
+ public void setSequenceRemovalTimeoutValue(long
sequenceRemovalTimeoutValue) {
+ this.sequenceRemovalTimeoutValue = sequenceRemovalTimeoutValue;
+ setSequenceRemovalTimeoutValueSet(true);
+ }
+
+ public void setSequenceRemovalTimeoutMeasure(String
sequenceRemovalTimeoutMeasure) {
+ this.sequenceRemovalTimeoutMeasure = sequenceRemovalTimeoutMeasure;
+ }
public boolean isEnableMakeConnection() {
return enableMakeConnection;
@@ -487,6 +539,14 @@
protected void setInactiveTimeoutValueSet(boolean
inactiveTimeoutValueSet) {
this.inactiveTimeoutValueSet = inactiveTimeoutValueSet;
+ }
+
+ protected void setSequenceRemovalTimeoutValueSet(boolean
sequenceRemovalTimeoutIntervalSet) {
+ this.sequenceRemovalTimeoutIntervalSet =
sequenceRemovalTimeoutIntervalSet;
+ }
+
+ protected boolean isSequenceRemovalTimeoutValueSet() {
+ return sequenceRemovalTimeoutIntervalSet;
}
protected boolean isInactivityTimeoutIntervalSet() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
Thu Mar 1 23:48:10 2007
@@ -77,8 +77,19 @@
} else if
(Sandesha2Constants.Assertions.ELEM_INACTIVITY_TIMEOUT_MEASURES
.equals(name)) {
- //using the previously set Inavtivity Timeout
+ //using the previously set Inactivity Timeout
propertyBean.setInactivityTimeoutMeasure
(element.getText().trim());
+
+ } else if (Sandesha2Constants.Assertions.ELEM_DELETION_TIMEOUT
+ .equals(name)) {
+
+ propertyBean.setSequenceRemovalTimeoutValue
(Long.parseLong(element
+ .getText().trim()));
+
+ } else if
(Sandesha2Constants.Assertions.ELEM_DELETION_TIMEOUT_MEASURES
+ .equals(name)) {
+ //using the previously set Inavtivity Timeout
+ propertyBean.setSequenceRemovalTimeoutMeasure
(element.getText().trim());
} else if (Sandesha2Constants.Assertions.ELEM_INVOKE_INORDER
.equals(name)) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Thu Mar 1 23:48:10 2007
@@ -19,18 +19,11 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClient;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaListener;
-import org.apache.sandesha2.client.SequenceReport;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -72,9 +65,6 @@
if (maxRetransmissionAttempts >= 0 &&
retransmitterBean.getSentCount() > maxRetransmissionAttempts)
timeOutSequence = true;
- if (!timeOutSequence)
- timeOutSequence =
SequenceManager.hasSequenceTimedOut(internalSequenceID, rmMsgCtx,
storageManager);
-
if (timeOutSequence) {
retransmitterBean.setSend(false);
@@ -86,7 +76,7 @@
// Only messages of outgoing sequences get
retransmitted. So named
// following method according to that.
- finalizeTimedOutSequence(internalSequenceID,
rmMsgCtx.getMessageContext(), storageManager);
+
SequenceManager.finalizeTimedOutSequence(internalSequenceID,
rmMsgCtx.getMessageContext(), storageManager);
continueSending = false;
}
}
@@ -107,8 +97,6 @@
*/
private static SenderBean adjustNextRetransmissionTime(SenderBean
retransmitterBean, SandeshaPolicyBean propertyBean) throws SandeshaException {
- // long lastSentTime = retransmitterBean.getTimeToSend();
-
int count = retransmitterBean.getSentCount();
long baseInterval = propertyBean.getRetransmissionInterval();
@@ -135,27 +123,6 @@
}
return interval;
- }
-
- private static void finalizeTimedOutSequence(String internalSequenceID,
MessageContext messageContext,
- StorageManager storageManager) throws SandeshaException
{
- ConfigurationContext configurationContext =
messageContext.getConfigurationContext();
-
- // Notify the clients of a timeout
- AxisFault fault = new AxisFault(
-
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout,
internalSequenceID));
- // Notify any waiting clients that the sequence has timeed out.
- FaultManager.notifyClientsOfFault(internalSequenceID,
storageManager, configurationContext, fault);
-
- // Already an active transaction, so don't want a new one
- TerminateManager.timeOutSendingSideSequence(internalSequenceID,
storageManager);
-
- SandeshaListener listener = (SandeshaListener) messageContext
-
.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
- if (listener != null) {
- SequenceReport report =
SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
configurationContext, false);
- listener.onTimeOut(report);
- }
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
Thu Mar 1 23:48:10 2007
@@ -49,6 +49,9 @@
propertyBean.setInactiveTimeoutInterval(Sandesha2Constants.Properties.DefaultValues.InactivityTimeout,
Sandesha2Constants.Properties.DefaultValues.InactivityTimeoutMeasure);
+
propertyBean.setSequenceRemovalTimeoutInterval(Sandesha2Constants.Properties.DefaultValues.sequenceRemovalTimeout,
+
Sandesha2Constants.Properties.DefaultValues.sequenceRemovalTimeoutMeasure);
+
propertyBean.setInOrder(Sandesha2Constants.Properties.DefaultValues.InvokeInOrder);
propertyBean.setMsgTypesToDrop(null);
propertyBean.setRetransmissionInterval(Sandesha2Constants.Properties.DefaultValues.RetransmissionInterval);
@@ -99,6 +102,11 @@
.getProperty(Sandesha2Constants.Properties.InactivityTimeoutMeasure);
loadInactivityTimeout(inactivityTimeoutStr,
inactivityTimeoutMeasure, propertyBean);
+ String sequenceRemovalTimeoutStr =
properties.getProperty(Sandesha2Constants.Properties.SequenceRemovalTimeout);
+ String sequenceRemovalTimeoutMeasure = properties
+
.getProperty(Sandesha2Constants.Properties.SequenceRemovalTimeoutMeasure);
+ loadSequenceRemovalTimeout(sequenceRemovalTimeoutStr,
sequenceRemovalTimeoutMeasure, propertyBean);
+
// String storageMgrClassStr = properties
//
.getProperty(Sandesha2Constants.Properties.StorageManager);
//
loadStoragemanagerClass(storageMgrClassStr,propertyBean);
@@ -155,6 +163,13 @@
String inactivityTimeoutMeasure = (String)
inactivityTimeoutMeasureParam.getValue();
loadInactivityTimeout(inactivityTimeoutStr,
inactivityTimeoutMeasure, propertyBean);
+ Parameter sequenceRemovalTimeoutParam =
desc.getParameter(Sandesha2Constants.Properties.SequenceRemovalTimeout);
+ String sequenceRemovalTimeoutStr = (String)
sequenceRemovalTimeoutParam.getValue();
+ Parameter sequenceRemovalTimeoutMeasureParam = desc
+
.getParameter(Sandesha2Constants.Properties.SequenceRemovalTimeoutMeasure);
+ String sequenceRemovalTimeoutMeasure = (String)
sequenceRemovalTimeoutMeasureParam.getValue();
+ loadSequenceRemovalTimeout(sequenceRemovalTimeoutStr,
sequenceRemovalTimeoutMeasure, propertyBean);
+
// Parameter storageMgrClassParam =
//
desc.getParameter(Sandesha2Constants.Properties.StorageManager);
// String storageMgrClassStr = (String)
storageMgrClassParam.getValue();
@@ -291,8 +306,7 @@
*
* @param properties
*/
- private static void loadExponentialBackoff(String expoBackoffStr,
SandeshaPolicyBean propertyBean)
- throws SandeshaException {
+ private static void loadExponentialBackoff(String expoBackoffStr,
SandeshaPolicyBean propertyBean) {
if (expoBackoffStr != null) {
expoBackoffStr = expoBackoffStr.trim();
@@ -373,12 +387,36 @@
}
/**
+ * Loads wsp:inactivityInterval.
+ *
+ * @param properties
+ */
+ private static void loadSequenceRemovalTimeout(String
sequenceRemovalTimeoutStr, String sequenceRemovalTimeoutMeasure,
+ SandeshaPolicyBean propertyBean) throws
SandeshaException {
+
+ if (sequenceRemovalTimeoutStr != null &&
sequenceRemovalTimeoutMeasure != null) {
+ try {
+ sequenceRemovalTimeoutStr =
sequenceRemovalTimeoutStr.trim();
+ sequenceRemovalTimeoutMeasure =
sequenceRemovalTimeoutMeasure.trim();
+
+ int sequenceRemovalTimeoutVal =
Integer.parseInt(sequenceRemovalTimeoutStr);
+ if (sequenceRemovalTimeoutVal > 0) {
+
propertyBean.setSequenceRemovalTimeoutInterval(sequenceRemovalTimeoutVal,
sequenceRemovalTimeoutMeasure);
+ }
+ } catch (NumberFormatException e) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDerriveInactivityTimeout);
+ throw new SandeshaException(message, e);
+ }
+ }
+ }
+
+ /**
* Loads the InMemoryStorageManager class name.
*
* @param properties
*/
private static void loadInMemoryStoragemanagerClass(String
inMemoryStorageMgrClassStr,
- SandeshaPolicyBean propertyBean) throws
SandeshaException {
+ SandeshaPolicyBean propertyBean){
if (inMemoryStorageMgrClassStr != null) {
inMemoryStorageMgrClassStr =
inMemoryStorageMgrClassStr.trim();
propertyBean.setInMemoryStorageManagerClass(inMemoryStorageMgrClassStr);
@@ -391,15 +429,14 @@
* @param properties
*/
private static void loadPermanentStoragemanagerClass(String
permanentStorageMgrClassStr,
- SandeshaPolicyBean propertyBean) throws
SandeshaException {
+ SandeshaPolicyBean propertyBean) {
if (permanentStorageMgrClassStr != null) {
permanentStorageMgrClassStr =
permanentStorageMgrClassStr.trim();
propertyBean.setPermanentStorageManagerClass(permanentStorageMgrClassStr);
}
}
- private static void loadInOrderInvocation(String inOrderInvocation,
SandeshaPolicyBean propertyBean)
- throws SandeshaException {
+ private static void loadInOrderInvocation(String inOrderInvocation,
SandeshaPolicyBean propertyBean) {
if (inOrderInvocation != null) {
inOrderInvocation = inOrderInvocation.trim();
@@ -411,8 +448,7 @@
}
}
- private static void loadEnableMakeConnection(String
enableMakeConnection, SandeshaPolicyBean propertyBean)
- throws SandeshaException {
+ private static void loadEnableMakeConnection(String
enableMakeConnection, SandeshaPolicyBean propertyBean) {
if (enableMakeConnection != null) {
enableMakeConnection = enableMakeConnection.trim();
@@ -424,8 +460,7 @@
}
}
- private static void loadUseSerialization(String useSerialization,
SandeshaPolicyBean propertyBean)
- throws SandeshaException {
+ private static void loadUseSerialization(String useSerialization,
SandeshaPolicyBean propertyBean) {
if (useSerialization != null) {
useSerialization = useSerialization.trim();
@@ -465,7 +500,7 @@
*
* @param properties
*/
- private static void loadSecurityManagerClass(String
securityManagerClassStr, SandeshaPolicyBean propertyBean) throws
SandeshaException {
+ private static void loadSecurityManagerClass(String
securityManagerClassStr, SandeshaPolicyBean propertyBean) {
if (securityManagerClassStr != null) {
securityManagerClassStr =
securityManagerClassStr.trim();
propertyBean.setSecurityManagerClass(securityManagerClassStr);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Thu Mar 1 23:48:10 2007
@@ -28,14 +28,16 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.client.SequenceReport;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.workers.SequenceEntry;
@@ -112,8 +114,6 @@
String tokenData =
securityManager.getTokenRecoveryData(token);
rmdBean.setSecurityTokenData(tokenData);
}
-
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
rmdBean.setSequenceID(sequenceId);
rmdBean.setNextMsgNoToProcess(1);
@@ -142,8 +142,9 @@
}
rmdBean.setRMVersion(specVersion);
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
- nextMsgMgr.insert(rmdBean);
+ storageManager.getRMDBeanMgr().insert(rmdBean);
// TODO get the SOAP version from the create seq message.
@@ -279,26 +280,50 @@
return rmsBean;
}
- public static boolean hasSequenceTimedOut(String internalSequenceId,
RMMsgContext rmMsgCtx, StorageManager storageManager)
+ public static boolean hasSequenceTimedOut(RMSBean rmsBean, String
internalSequenceId, StorageManager storageManager)
throws SandeshaException {
- // operation is the lowest level, Sandesha2 could be engaged.
- SandeshaPolicyBean propertyBean =
SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext()
- .getAxisOperation());
+ SandeshaPolicyBean propertyBean =
+
SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
if (propertyBean.getInactivityTimeoutInterval() <= 0)
return false;
boolean sequenceTimedOut = false;
-
- RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
- if (rmsBean != null) {
- long lastActivatedTime = rmsBean.getLastActivatedTime();
- long timeNow = System.currentTimeMillis();
- if (lastActivatedTime > 0 && (lastActivatedTime +
propertyBean.getInactivityTimeoutInterval() < timeNow))
- sequenceTimedOut = true;
- }
+ long lastActivatedTime = rmsBean.getLastActivatedTime();
+ long timeNow = System.currentTimeMillis();
+ if (lastActivatedTime > 0 && (lastActivatedTime +
propertyBean.getInactivityTimeoutInterval() < timeNow))
+ sequenceTimedOut = true;
+
return sequenceTimedOut;
}
+
+ public static void finalizeTimedOutSequence(String internalSequenceID,
MessageContext messageContext,
+ StorageManager storageManager) throws SandeshaException
{
+ ConfigurationContext configurationContext = null;
+ if (messageContext == null)
+ configurationContext = storageManager.getContext();
+ else
+ configurationContext =
messageContext.getConfigurationContext();
+
+ // Notify the clients of a timeout
+ AxisFault fault = new AxisFault(
+
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout,
internalSequenceID));
+ // Notify any waiting clients that the sequence has timeed out.
+ FaultManager.notifyClientsOfFault(internalSequenceID,
storageManager, configurationContext, fault);
+
+ // Already an active transaction, so don't want a new one
+ TerminateManager.timeOutSendingSideSequence(internalSequenceID,
storageManager);
+
+ if (messageContext != null) {
+ SandeshaListener listener = (SandeshaListener)
messageContext
+
.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+ if (listener != null) {
+ SequenceReport report =
SandeshaClient.getOutgoingSequenceReport(internalSequenceID,
configurationContext, false);
+ listener.onTimeOut(report);
+ }
+ }
+ }
+
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Thu Mar 1 23:48:10 2007
@@ -248,6 +248,7 @@
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
rmsBean.setTimedOut(true);
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
storageManager.getRMSBeanMgr().update(rmsBean);
cleanSendingSideData(internalSequenceId, storageManager);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Thu Mar 1 23:48:10 2007
@@ -18,17 +18,25 @@
package org.apache.sandesha2.workers;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.SandeshaStorageException;
+import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
/**
* This is responsible for sending and re-sending messages of Sandesha2. This
@@ -59,7 +67,8 @@
// Pick a sequence using a round-robin approach
ArrayList allSequencesList = getSequences();
int size = allSequencesList.size();
- log.debug("Choosing one from " + size + " sequences");
+ if (log.isDebugEnabled())
+ log.debug("Choosing one from " + size + "
sequences");
if(nextIndex >= size) {
nextIndex = 0;
@@ -70,13 +79,17 @@
}
processedMessage = false;
+ // At this point - delete any sequences that
have timed out, or been terminated.
+ deleteTerminatedSequences(storageManager);
+
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
}
SequenceEntry entry = (SequenceEntry)
allSequencesList.get(nextIndex++);
String sequenceId = entry.getSequenceId();
- log.debug("Chose sequence " + sequenceId);
+ if (log.isDebugEnabled())
+ log.debug("Chose sequence " + sequenceId);
transaction = storageManager.getTransaction();
@@ -87,9 +100,12 @@
matcher.setInternalSequenceID(sequenceId);
matcher.setTerminated(false);
RMSBean rms =
storageManager.getRMSBeanMgr().findUnique(matcher);
- if(rms != null) {
- sequenceId = rms.getSequenceID();
- found = true;
+ if(rms != null && !rms.isTerminated() &&
!rms.isTimedOut()) {
+ sequenceId = rms.getSequenceID();
+ if
(SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
+
SequenceManager.finalizeTimedOutSequence(sequenceId, null, storageManager);
+ else
+ found = true;
}
} else {
RMDBean matcher = new RMDBean();
@@ -192,4 +208,128 @@
return false;
}
+ /**
+ * Finds any RMDBeans that have not been used inside the set
InnactivityTimeoutInterval
+ *
+ * Iterates through RMSBeans and RMDBeans that have been terminated or
timed out and
+ * deletes them.
+ *
+ */
+ private void deleteTerminatedSequences(StorageManager storageManager) {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::deleteTerminatedSequences");
+
+ RMSBean finderBean = new RMSBean();
+ finderBean.setTerminated(true);
+
+ Transaction transaction = storageManager.getTransaction();
+
+ try {
+
+ SandeshaPolicyBean propertyBean =
+
SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
+
+ // Find terminated sequences.
+ List rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean);
+
+ finderBean.setTerminated(false);
+ finderBean.setTimedOut(true);
+
+ // Find timed out sequences
+ rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
+
+ deleteRMSBeans(rmsBeans, propertyBean);
+
+ // Remove any terminated RMDBeans.
+ RMDBean finderRMDBean = new RMDBean();
+ finderRMDBean.setTerminated(true);
+
+ List rmdBeans = storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ Iterator beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean)beans.next();
+
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+ long deleteTime =
propertyBean.getSequenceRemovalTimeoutInterval();
+
+ if (deleteTime < 0)
+ deleteTime = 0;
+
+ // delete sequences that have been timedout or deleted for more
than
+ // the SequenceRemovalTimeoutInterval
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Deleting RMDBean " + deleteTime + "
: " + rmdBean);
+
storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+ }
+ }
+
+ // Terminate RMD Sequences that have been inactive.
+ if (propertyBean.getInactivityTimeoutInterval() > 0) {
+ finderRMDBean.setTerminated(false);
+
+ rmdBeans =
storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+ beans = rmdBeans.iterator();
+ while (beans.hasNext()) {
+ RMDBean rmdBean = (RMDBean)beans.next();
+
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmdBean.getLastActivatedTime();
+
+ if ((lastActivated +
propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+ // Terminate
+ rmdBean.setTerminated(true);
+ rmdBean.setLastActivatedTime(timeNow);
+ if (log.isDebugEnabled())
+ log.debug(System.currentTimeMillis() +
"Marking RMDBean as terminated " + rmdBean);
+ storageManager.getRMDBeanMgr().update(rmdBean);
+ }
+ }
+ }
+
+ } catch (SandeshaException e) {
+ if (log.isErrorEnabled())
+ log.error(e);
+ } finally {
+ transaction.commit();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::deleteTerminatedSequences");
+ }
+
+ private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean
propertyBean)
+
+ throws SandeshaStorageException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: Sender::deleteRMSBeans");
+
+ Iterator beans = rmsBeans.iterator();
+
+ while (beans.hasNext())
+ {
+ RMSBean rmsBean = (RMSBean)beans.next();
+ long timeNow = System.currentTimeMillis();
+ long lastActivated = rmsBean.getLastActivatedTime();
+ // delete sequences that have been timedout or deleted for more than
+ // the SequenceRemovalTimeoutInterval
+ long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
+ if (deleteTime < 0)
+ deleteTime = 0;
+
+ if ((lastActivated + deleteTime) < timeNow) {
+ if (log.isDebugEnabled())
+ log.debug("Removing RMSBean " + rmsBean);
+
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: Sender::deleteRMSBeans");
+ }
}
Modified: webservices/sandesha/trunk/java/test-resources/sandesha2.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test-resources/sandesha2.properties?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
--- webservices/sandesha/trunk/java/test-resources/sandesha2.properties
(original)
+++ webservices/sandesha/trunk/java/test-resources/sandesha2.properties Thu Mar
1 23:48:10 2007
@@ -7,7 +7,8 @@
AcknowledgementInterval=8000
ExponentialBackoff=false
InactivityTimeout=3
-InactivityTimeoutMeasure=hours
+InactivityTimeoutMeasure=hours
+
#Security Manager Class
#----------------------
@@ -16,3 +17,6 @@
#Storage Manager Class
#----------------------
InMemoryStorageManager=org.apache.sandesha2.storage.inmemory.InMemoryStorageManager1
+
+SequenceRemovalTimeout=1
+SequenceRemovalTimeoutMeasure=hours
Modified:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/PropertyLoaderTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/PropertyLoaderTest.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/PropertyLoaderTest.java
(original)
+++
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/PropertyLoaderTest.java
Thu Mar 1 23:48:10 2007
@@ -75,4 +75,10 @@
String secMgr = propertyBean.getSecurityManagerClass();
assertEquals(secMgr,"org.apache.sandesha2.security.SecurityManager1");
}
+
+ public void testSequenceRemovalTimeout() {
+ long value = propertyBean.getSequenceRemovalTimeoutInterval();
+ assertEquals((60*60*1*1000), value);
+ }
+
}
Modified:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/faulttests/SequenceTimedOutTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/faulttests/SequenceTimedOutTest.java?view=diff&rev=513646&r1=513645&r2=513646
==============================================================================
---
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/faulttests/SequenceTimedOutTest.java
(original)
+++
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/faulttests/SequenceTimedOutTest.java
Thu Mar 1 23:48:10 2007
@@ -19,6 +19,7 @@
import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
@@ -34,6 +35,10 @@
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.client.SequenceReport;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.util.SandeshaUtil;
public class SequenceTimedOutTest extends SandeshaTestCase {
@@ -136,5 +141,232 @@
}
+ /**
+ * Test to check that when a sequence times out - it gets deleted after
the timeout interval.
+ *
+ * @throws Exception
+ */
+ public void testRMSSequenceTimeoutSequenceDeleted () throws Exception {
+
+ String to = "http://127.0.0.1:" + 9999 +
"/axis2/services/RMSampleService";
+
+ String repoPath = "target" + File.separator + "repos" +
File.separator + "client";
+ String axis2_xml = "target" + File.separator + "repos" +
File.separator + "client" + File.separator + "client_axis2.xml";
+
+ ConfigurationContext configContext =
ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ Options clientOptions = new Options ();
+ clientOptions.setAction(echoAction);
+ clientOptions.setTo(new EndpointReference (to));
+
+ String sequenceKey = SandeshaUtil.getUUID();
+
clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+
+ ServiceClient serviceClient = new ServiceClient
(configContext,null);
+
+ HashMap axisServices =
configContext.getAxisConfiguration().getServices();
+
+ AxisService service = null;
+ Iterator values = axisServices.values().iterator();
+ while(values.hasNext())
+ service = (AxisService)values.next();
+
+ // Set the Sequence timout property to 1 second.
+ Iterator operations = service.getOperations();
+
+ while (operations.hasNext())
+ {
+ AxisOperation op = (AxisOperation) operations.next();
+ SandeshaPolicyBean propertyBean =
+ SandeshaUtil.getPropertyBean(op);
+
+ // Indicate that the sequence should timeout after 1 second
+ // And that it should be deleted after 2 seconds
+ if (propertyBean != null) {
+ propertyBean.setInactiveTimeoutInterval(1, "seconds");
+ propertyBean.setSequenceRemovalTimeoutInterval(2,
"seconds");
+ }
+ }
+
+ // Set a bad acks to so the CreateSequence will be refused.
+ String acksTo = AddressingConstants.Final.WSA_NONE_URI;
+
clientOptions.setProperty(SandeshaClientConstants.AcksTo,acksTo);
+
+ clientOptions.setTransportInProtocol(Constants.TRANSPORT_HTTP);
+ clientOptions.setUseSeparateListener(true);
+ clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE,
"true");
+ serviceClient.setOptions(clientOptions);
+
+ TestCallback callback1 = new TestCallback ("Callback 1");
+ serviceClient.sendReceiveNonBlocking
(getEchoOMBlock("echo1",sequenceKey),callback1);
+
+ long limit = System.currentTimeMillis() + waitTime;
+ Error lastError = null;
+ while(System.currentTimeMillis() < limit) {
+ Thread.sleep(tickTime); // Try the assertions each tick
interval, until they pass or we time out
+
+ try {
+ //assertions for the out sequence.
+ SequenceReport sequenceReport =
SandeshaClient.getOutgoingSequenceReport(serviceClient);
+
assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TIMED_OUT);
+
assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
+
+ assertTrue(callback1.isErrorRported());
+ assertEquals(callback1.getResult(),null);
+
+ lastError = null;
+ break;
+ } catch(Error e) {
+ lastError = e;
+ }
+ }
+
+ if(lastError != null) throw lastError;
+
+ while(System.currentTimeMillis() < limit) {
+ Thread.sleep(tickTime); // Try the assertions each tick
interval, until they pass or we time out
+
+ // Check that the sequence has been deleted.
+ StorageManager storageManager =
+
SandeshaUtil.getSandeshaStorageManager(configContext,
configContext.getAxisConfiguration());
+
+ Transaction tran = storageManager.getTransaction();
+
+ RMSBean finderBean = new RMSBean();
+ List rmsBeans =
storageManager.getRMSBeanMgr().find(finderBean);
+
+ tran.commit();
+
+ lastError = null;
+
+ if (!rmsBeans.isEmpty())
+ lastError = new Error("rmsBeans not empty " +
rmsBeans);
+ else
+ break;
+
+ }
+
+ if(lastError != null) throw lastError;
+
+ configContext.getListenerManager().stop();
+ serviceClient.cleanup();
+
+ }
+
+ private static final String server_repoPath = "target" + File.separator
+ "repos" + File.separator + "server";
+ private static final String server_axis2_xml = "target" +
File.separator + "repos" + File.separator + "server" + File.separator +
"server_axis2.xml";
+
+ /**
+ * Checks that an RMDSequence is terminated once the timeout interval
arrives.
+ * Also that the RMDBean is deleted once the SequenceRemovalTimeout
arrives.
+ */
+ public void testRMDSequenceTerminatedDeleted() throws Exception{
+ ConfigurationContext serverConfigContext =
startServer(server_repoPath, server_axis2_xml);
+
+ String to = "http://127.0.0.1:" + serverPort +
"/axis2/services/RMSampleService";
+
+ String repoPath = "target" + File.separator + "repos" +
File.separator + "client";
+ String axis2_xml = "target" + File.separator + "repos" +
File.separator + "client" + File.separator + "client_axis2.xml";
+
+ ConfigurationContext configContext =
ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ Options clientOptions = new Options ();
+ clientOptions.setAction(pingAction);
+ clientOptions.setTo(new EndpointReference (to));
+
+ String sequenceKey = SandeshaUtil.getUUID();
+
clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+
+ ServiceClient serviceClient = new ServiceClient
(configContext,null);
+ serviceClient.setOptions(clientOptions);
+
+ HashMap axisServices =
serverConfigContext.getAxisConfiguration().getServices();
+
+ AxisService service = null;
+ Iterator values = axisServices.values().iterator();
+ while(values.hasNext())
+ service = (AxisService)values.next();
+
+ // Set the Sequence timout property to 1 second.
+ Iterator operations = service.getOperations();
+
+ while (operations.hasNext())
+ {
+ AxisOperation op = (AxisOperation) operations.next();
+ SandeshaPolicyBean propertyBean =
+ SandeshaUtil.getPropertyBean(op);
+
+ // Indicate that the sequence should timeout after 2 second
+ // And that it should be deleted after 2 seconds
+ if (propertyBean != null) {
+ propertyBean.setInactiveTimeoutInterval(2, "seconds");
+ propertyBean.setSequenceRemovalTimeoutInterval(2,
"seconds");
+ }
+ }
+
+ // Send a single ping message
+ serviceClient.fireAndForget(getPingOMBlock("ping1"));
+
+ long limit = System.currentTimeMillis() + waitTime;
+ Error lastError = null;
+ while(System.currentTimeMillis() < limit) {
+ Thread.sleep(tickTime); // Try the assertions each tick
interval, until they pass or we time out
+
+ // Check that the sequence has been deleted.
+ StorageManager storageManager =
+
SandeshaUtil.getSandeshaStorageManager(serverConfigContext,
serverConfigContext.getAxisConfiguration());
+
+ Transaction tran = storageManager.getTransaction();
+
+ RMDBean finderBean = new RMDBean();
+ List rmdBeans =
storageManager.getRMDBeanMgr().find(finderBean);
+
+ tran.commit();
+
+ lastError = null;
+
+ if (rmdBeans.isEmpty())
+ lastError = new Error("rmdBeans empty " +
rmdBeans);
+ else {
+ RMDBean bean = (RMDBean)rmdBeans.get(0);
+ if (bean.isTerminated())
+ break;
+
+ lastError = new Error("RMDBean not deleted " +
bean);
+ }
+ }
+
+ if(lastError != null) throw lastError;
+
+ while(System.currentTimeMillis() < limit) {
+ Thread.sleep(tickTime); // Try the assertions each tick
interval, until they pass or we time out
+
+ // Check that the sequence has been deleted.
+ StorageManager storageManager =
+
SandeshaUtil.getSandeshaStorageManager(serverConfigContext,
serverConfigContext.getAxisConfiguration());
+
+ Transaction tran = storageManager.getTransaction();
+
+ RMDBean finderBean = new RMDBean();
+ List rmdBeans =
storageManager.getRMDBeanMgr().find(finderBean);
+
+ tran.commit();
+
+ lastError = null;
+
+ if (!rmdBeans.isEmpty())
+ lastError = new Error("rmdBeans not empty " +
rmdBeans);
+ else
+ break;
+ }
+
+ if(lastError != null) throw lastError;
+
+ configContext.getListenerManager().stop();
+ serviceClient.cleanup();
+
+ }
+
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]