Author: chamikara
Date: Mon Apr 17 23:57:27 2006
New Revision: 394865
URL: http://svn.apache.org/viewcvs?rev=394865&view=rev
Log:
Addded the SandeshaListner interface.
Users can register an object of this to get notified for special events (for
e.g. RM faults, sequence time outs).
Additional methods to the Sandeshaclient.
Corrected the timing out logic. Now a sequence can timeout when the maximum
number of retransmissions exceed or depending on the last activated time. In a
timeout, a registered SandeshaListner object will be invoked.
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
Removed:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaFaultCallback.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClient.java
Mon Apr 17 23:57:27 2006
@@ -588,7 +588,7 @@
return
getOutgoingSequenceReport(internalSequenceID,configurationContext);
}
- private static SequenceReport getOutgoingSequenceReport (String
internalSequenceID,ConfigurationContext configurationContext) throws
SandeshaException {
+ public static SequenceReport getOutgoingSequenceReport (String
internalSequenceID,ConfigurationContext configurationContext) throws
SandeshaException {
SequenceReport sequenceReport = new SequenceReport ();
sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_OUT);
@@ -599,12 +599,16 @@
Transaction reportTransaction = storageManager.getTransaction();
+ sequenceReport.setInternalSequenceID(internalSequenceID);
+
CreateSeqBean createSeqFindBean = new CreateSeqBean ();
createSeqFindBean.setInternalSequenceID(internalSequenceID);
CreateSeqBean createSeqBean =
createSeqMgr.findUnique(createSeqFindBean);
+ //if data not is available sequence has to be terminated or
timedOut.
if (createSeqBean==null) {
+
//check weather this is an terminated sequence.
if
(isSequenceTerminated(internalSequenceID,seqPropMgr)) {
fillTerminatedOutgoingSequenceInfo
(sequenceReport,internalSequenceID,seqPropMgr);
@@ -617,10 +621,16 @@
return sequenceReport;
}
+
+ //sequence must hv been timed out before establiching.
No other posibility I can think of.
+ //this does not get recorded since there is no key
(which is normally the sequenceID) to store it.
+ //(properties with key as the internalSequenceID get
deleted in timing out)
- String message = "Unrecorder internalSequenceID";
- log.debug(message);
- return null;
+ //so, setting the sequence status to INITIAL
+
sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_INITIAL);
+
+ //returning the current sequence report.
+ return sequenceReport;
}
String outSequenceID = createSeqBean.getSequenceID();
@@ -633,7 +643,6 @@
}
sequenceReport.setSequenceStatus(SequenceReport.SEQUENCE_STATUS_ESTABLISHED);
-
fillOutgoingSequenceInfo(sequenceReport,outSequenceID,seqPropMgr);
reportTransaction.commit();
@@ -678,7 +687,6 @@
}
String outSequenceID = internalSequenceBean.getSequenceID();
-
SequencePropertyBean sequenceTerminatedBean =
seqPropMgr.retrieve(outSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT);
if (sequenceTerminatedBean!=null &&
Sandesha2Constants.VALUE_TRUE.equals(sequenceTerminatedBean.getValue())) {
return true;
@@ -798,7 +806,7 @@
return sequenceReport;
}
- private static String getInternalSequenceID (String to, String
sequenceKey) {
+ public static String getInternalSequenceID (String to, String
sequenceKey) {
return SandeshaUtil.getInternalSequenceID(to,sequenceKey);
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaClientConstants.java
Mon Apr 17 23:57:27 2006
@@ -25,5 +25,5 @@
public static String MESSAGE_NUMBER = "Sandesha2MessageNumber";
public static String RM_SPEC_VERSION = "Sandesha2RMSpecVersion";
public static String DUMMY_MESSAGE = "Sandesha2DummyMessage"; //If this
property is set, even though this message will invoke the RM handlers, this
will not be sent as an actual application message
- public static String RM_FAULT_CALLBACK = "Sandesha2RMFaultCallback";
+ public static String SANDESHA_LISTENER = "Sandesha2Listener";
}
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java?rev=394865&view=auto
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
(added)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SandeshaListener.java
Mon Apr 17 23:57:27 2006
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.client;
+
+import org.apache.axis2.AxisFault;
+
+/**
+ * By implementing this interface and registering an object with
+ * Sandesha2, users will be invoked in some events.
+ *
+ * @author Chamikara Jayalath <[EMAIL PROTECTED]>
+ */
+
+public interface SandeshaListener {
+
+ /**
+ * This sill be invoked when Sandesha2 receive a fault message
+ * in response to a RM control message that was sent by it.
+ */
+ public void onError(AxisFault fault);
+
+ /**
+ * This will be invoked when a specific sequence time out.
+ * The timing out method depends on policies.
+ */
+ public void onTimeOut(SequenceReport report);
+
+}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Mon Apr 17 23:57:27 2006
@@ -40,7 +40,7 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
import org.apache.sandesha2.storage.StorageManager;
@@ -90,7 +90,7 @@
if (requestMessage!=null) {
if(SandeshaUtil.isRetriableOnFaults(requestMessage)){
- SandeshaFaultCallback
faultCallback = (SandeshaFaultCallback)
operationContext.getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+ SandeshaListener
faultCallback = (SandeshaListener)
operationContext.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
if
(faultCallback!=null) {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Mon Apr 17 23:57:27 2006
@@ -114,10 +114,7 @@
if (dummyMessageString!=null &&
Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
dummyMessage = true;
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
-
-
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context);
MsgProcessor msgProcessor = null;
int messageType = rmMsgCtx.getMessageType();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Mon Apr 17 23:57:27 2006
@@ -131,10 +131,12 @@
return;
}
- //updating the last activated time of the sequence.
-// Transaction lastUpdatedTimeTransaction =
storageManager.getTransaction();
-//
SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
-// lastUpdatedTimeTransaction.commit();
+ String internalSequenceID =
SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx);
+
+ //updating the last activated time of the sequence.
+ Transaction lastUpdatedTimeTransaction =
storageManager.getTransaction();
+
SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
+ lastUpdatedTimeTransaction.commit();
//Starting transaction
Transaction ackTransaction = storageManager.getTransaction();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -42,7 +42,7 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -438,11 +438,11 @@
ConfigurationContext configContext = msgContext
.getConfigurationContext();
//setting the Fault callback
- SandeshaFaultCallback faultCallback = (SandeshaFaultCallback)
msgContext.getOptions().getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+ SandeshaListener faultCallback = (SandeshaListener)
msgContext.getOptions().getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
if (faultCallback!=null) {
OperationContext operationContext =
msgContext.getOperationContext();
if (operationContext!=null) {
-
operationContext.setProperty(SandeshaClientConstants.RM_FAULT_CALLBACK,faultCallback);
+
operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER,faultCallback);
}
}
@@ -848,7 +848,7 @@
createSeqEntry.setMessageContextRefKey(key);
createSeqEntry.setTimeToSend(System.currentTimeMillis());
createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-
+ createSeqEntry.setInternalSequenceID(internalSequenceId);
// this will be set to true in the sender
createSeqEntry.setSend(true);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -32,7 +32,7 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaFaultCallback;
+import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -230,12 +230,12 @@
MessageContext msgCtx = rmMsgCtx.getMessageContext();
- //adding the RM_FAULT_CALLBACK
- SandeshaFaultCallback faultCallback = (SandeshaFaultCallback)
msgCtx.getOptions().getProperty(SandeshaClientConstants.RM_FAULT_CALLBACK);
+ //adding the SANDESHA_LISTENER
+ SandeshaListener faultCallback = (SandeshaListener)
msgCtx.getOptions().getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
if (faultCallback!=null) {
OperationContext operationContext =
msgCtx.getOperationContext();
if (operationContext!=null) {
-
operationContext.setProperty(SandeshaClientConstants.RM_FAULT_CALLBACK,faultCallback);
+
operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER,faultCallback);
}
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Mon Apr 17 23:57:27 2006
@@ -288,9 +288,8 @@
updateAppMessagesTransaction.commit();
Transaction lastUpdatedTimeTransaction =
storageManager.getTransaction();
-
SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+
SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
lastUpdatedTimeTransaction.commit();
-
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
Mon Apr 17 23:57:27 2006
@@ -80,7 +80,7 @@
SenderBean findBean = new SenderBean();
String sequnceID = SandeshaUtil.getSequenceIDFromRMMessage
(rmMessageContext);
-
+
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
findBean.setSend(true);
findBean.setReSend(false);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Mon Apr 17 23:57:27 2006
@@ -22,8 +22,13 @@
import org.apache.axis2.description.Parameter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.client.SequenceReport;
import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -38,7 +43,7 @@
Log log = LogFactory.getLog( getClass());
- public SenderBean adjustRetransmittion(
+ public boolean adjustRetransmittion(
SenderBean retransmitterBean,ConfigurationContext
configContext) throws SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
@@ -48,18 +53,36 @@
throw new SandeshaException ("Stored Key not present in
the retransmittable message");
MessageContext messageContext =
storageManager.retrieveMessageContext(storedKey,configContext);
-
-
+ RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(messageContext);
+
+ String internalSequenceID =
retransmitterBean.getInternalSequenceID();
+ String sequenceID = retransmitterBean.getSequenceID();
+
SandeshaPropertyBean propertyBean =
SandeshaUtil.getPropretyBean(messageContext);
retransmitterBean.setSentCount(retransmitterBean.getSentCount()
+ 1);
adjustNextRetransmissionTime(retransmitterBean, propertyBean);
int maxRetransmissionAttempts =
propertyBean.getMaximumRetransmissionCount();
+
+ boolean timeOutSequence = false;
if (maxRetransmissionAttempts>=0 &&
retransmitterBean.getSentCount() > maxRetransmissionAttempts)
+ timeOutSequence = true;
+
+ boolean sequenceTimedOut =
SequenceManager.hasSequenceTimedOut(internalSequenceID, rmMsgCtx);
+ if (sequenceTimedOut)
+ timeOutSequence = true;
+
+ boolean continueSending = true;
+ if (timeOutSequence) {
stopRetransmission(retransmitterBean);
-
- return retransmitterBean;
+
+ //Only messages of outgoing sequences get
retransmitted. So named following method according to that.
+ finalizeTimedOutSequence
(internalSequenceID,sequenceID, messageContext);
+ continueSending = false;
+ }
+
+ return continueSending;
}
/**
@@ -108,6 +131,17 @@
}
return interval;
+ }
+
+ private void finalizeTimedOutSequence (String internalSequenceID,
String sequenceID ,MessageContext messageContext) throws SandeshaException {
+ ConfigurationContext configurationContext =
messageContext.getConfigurationContext();
+ SequenceReport report =
SandeshaClient.getOutgoingSequenceReport(internalSequenceID
,configurationContext);
+
TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID,
false);
+
+ SandeshaListener listener = (SandeshaListener)
messageContext.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
+ if (listener!=null) {
+ listener.onTimeOut(report);
+ }
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
Mon Apr 17 23:57:27 2006
@@ -311,6 +311,9 @@
specVerionBean.setValue(specVersion);
seqPropMgr.insert(specVerionBean);
+ //updating the last activated time.
+
updateLastActivatedTime(internalSequenceId,configurationContext);
+
SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
updateClientSideListnerIfNeeded
(firstAplicationMsgCtx,anonymousURI);
@@ -357,19 +360,19 @@
* @param configContext
* @throws SandeshaException
*/
- public static void updateLastActivatedTime (String sequenceID,
ConfigurationContext configContext) throws SandeshaException {
+ public static void updateLastActivatedTime (String propertyKey,
ConfigurationContext configContext) throws SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
//Transaction lastActivatedTransaction =
storageManager.getTransaction();
SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean lastActivatedBean =
sequencePropertyBeanMgr.retrieve(sequenceID,
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ SequencePropertyBean lastActivatedBean =
sequencePropertyBeanMgr.retrieve(propertyKey,
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
boolean added = false;
if (lastActivatedBean==null) {
added = true;
lastActivatedBean = new SequencePropertyBean ();
- lastActivatedBean.setSequenceID(sequenceID);
+ lastActivatedBean.setSequenceID(propertyKey);
lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
}
@@ -385,12 +388,12 @@
}
- public static long getLastActivatedTime (String sequenceID,
ConfigurationContext configContext) throws SandeshaException {
+ public static long getLastActivatedTime (String propertyKey,
ConfigurationContext configContext) throws SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(propertyKey,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
long lastActivatedTime = -1;
@@ -401,7 +404,7 @@
return lastActivatedTime;
}
- public static boolean hasSequenceTimedOut (String sequenceID,
RMMsgContext rmMsgCtx) throws SandeshaException {
+ public static boolean hasSequenceTimedOut (String propertyKey,
RMMsgContext rmMsgCtx) throws SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropretyBeanMgr();
@@ -422,7 +425,7 @@
//SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
//if (lastActivatedBean!=null) {
- long lastActivatedTime =
getLastActivatedTime(sequenceID,rmMsgCtx.getMessageContext().getConfigurationContext());
+ long lastActivatedTime =
getLastActivatedTime(propertyKey,rmMsgCtx.getMessageContext().getConfigurationContext());
long timeNow = System.currentTimeMillis();
if (lastActivatedTime>0 &&
(lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow))
sequenceTimedOut = true;
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
Mon Apr 17 23:57:27 2006
@@ -184,15 +184,16 @@
* @param sequenceID
* @throws SandeshaException
*/
- public static void terminateSendingSide (ConfigurationContext
configContext, String sequenceID,boolean serverSide) throws SandeshaException {
+ public static void terminateSendingSide (ConfigurationContext
configContext, String internalSequenceID,boolean serverSide) throws
SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean seqTerminatedBean = new
SequencePropertyBean
(sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
+
+ SequencePropertyBean seqTerminatedBean = new
SequencePropertyBean
(internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
seqPropMgr.insert(seqTerminatedBean);
- cleanSendingSideData(configContext,sequenceID,serverSide);
+
cleanSendingSideData(configContext,internalSequenceID,serverSide);
}
@@ -200,8 +201,26 @@
private static void doUpdatesIfNeeded (String sequenceID,
SequencePropertyBean propertyBean, SequencePropertyBeanMgr seqPropMgr) throws
SandeshaException {
+
+ boolean addEntryWithSequenceID = false;
+
if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES))
{
-
+ addEntryWithSequenceID = true;
+ }
+
+ if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED))
{
+ addEntryWithSequenceID = true;
+ }
+
+ if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED))
{
+ addEntryWithSequenceID = true;
+ }
+
+ if
(propertyBean.getName().equals(Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT))
{
+ addEntryWithSequenceID = true;
+ }
+
+ if (addEntryWithSequenceID && sequenceID!=null) {
//this value cannot be completely deleted since this
data will be needed by SequenceReports
//so saving it with the sequenceID value being the out
sequenceID.
@@ -213,8 +232,10 @@
seqPropMgr.insert(newBean);
//TODO amazingly this property does not seem to get
deleted without following - in the hibernate impl
//(even though the lines efter current methodcall do
this).
- seqPropMgr.delete
(propertyBean.getSequenceID(),propertyBean.getName());
+ seqPropMgr.delete
(propertyBean.getSequenceID(),propertyBean.getName());
}
+
+
}
private static boolean isProportyDeletable (String name) {
@@ -244,60 +265,45 @@
return deleatable;
}
- public static void timeOutSendingSideSequence (ConfigurationContext
context,String sequenceID, boolean serverside) throws SandeshaException {
+ public static void timeOutSendingSideSequence (ConfigurationContext
context,String internalSequenceID, boolean serverside) throws SandeshaException
{
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context);
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropretyBeanMgr();
- SequencePropertyBean seqTerminatedBean = new
SequencePropertyBean
(sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
+ SequencePropertyBean seqTerminatedBean = new
SequencePropertyBean
(internalSequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TIMED_OUT,Sandesha2Constants.VALUE_TRUE);
seqPropMgr.insert(seqTerminatedBean);
- cleanSendingSideData(context,sequenceID,serverside);
+ cleanSendingSideData(context,internalSequenceID,serverside);
}
- private static void cleanSendingSideData (ConfigurationContext
configContext,String sequenceID, boolean serverSide) throws SandeshaException {
+ private static void cleanSendingSideData (ConfigurationContext
configContext,String internalSequenceID, boolean serverSide) throws
SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
SenderBeanMgr retransmitterBeanMgr =
storageManager.getRetransmitterBeanMgr();
CreateSeqBeanMgr createSeqBeanMgr =
storageManager.getCreateSeqBeanMgr();
- SequencePropertyBean sequenceTerminatedBean = new
SequencePropertyBean
(sequenceID,Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED,Sandesha2Constants.VALUE_TRUE);
- sequencePropertyBeanMgr.insert(sequenceTerminatedBean);
+ String outSequenceID =
SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID,configContext);
if (!serverSide) {
- //stpoing the listner for the client side.
-
- //SequencePropertyBean outGoingAcksToBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQ_ACKSTO);
-
boolean stopListnerForAsyncAcks = false;
- SequencePropertyBean internalSequenceBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- if (internalSequenceBean!=null) {
- String internalSequenceID =
internalSequenceBean.getValue();
- SequencePropertyBean acksToBean =
sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+ SequencePropertyBean acksToBean =
sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
- String addressingNamespace =
SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configContext);
- String anonymousURI =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
+ String addressingNamespace =
SandeshaUtil.getSequenceProperty(internalSequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configContext);
+ String anonymousURI =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
- if (acksToBean!=null) {
- String acksTo = acksToBean.getValue();
- if (acksTo!=null &&
!anonymousURI.equals(acksTo)) {
- stopListnerForAsyncAcks = true;
- }
+ if (acksToBean!=null) {
+ String acksTo = acksToBean.getValue();
+ if (acksTo!=null &&
!anonymousURI.equals(acksTo)) {
+ stopListnerForAsyncAcks = true;
}
}
}
- SequencePropertyBean internalSequenceBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
- if (internalSequenceBean==null)
- throw new SandeshaException ("TempSequence entry not
found");
-
- String internalSequenceId = (String)
internalSequenceBean.getValue();
-
//removing retransmitterMgr entries
//SenderBean findRetransmitterBean = new SenderBean ();
//findRetransmitterBean.setInternalSequenceID(internalSequenceId);
- Collection collection =
retransmitterBeanMgr.find(internalSequenceId);
+ Collection collection =
retransmitterBeanMgr.find(internalSequenceID);
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
SenderBean retransmitterBean = (SenderBean)
iterator.next();
@@ -306,7 +312,7 @@
//removing the createSeqMgrEntry
CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
-
findCreateSequenceBean.setInternalSequenceID(internalSequenceId);
+
findCreateSequenceBean.setInternalSequenceID(internalSequenceID);
collection = createSeqBeanMgr.find(findCreateSequenceBean);
iterator = collection.iterator();
while (iterator.hasNext()) {
@@ -316,19 +322,19 @@
//removing sequence properties
SequencePropertyBean findSequencePropertyBean1 = new
SequencePropertyBean ();
- findSequencePropertyBean1.setSequenceID(internalSequenceId);
+ findSequencePropertyBean1.setSequenceID(internalSequenceID);
collection =
sequencePropertyBeanMgr.find(findSequencePropertyBean1);
iterator = collection.iterator();
while (iterator.hasNext()) {
SequencePropertyBean sequencePropertyBean =
(SequencePropertyBean) iterator.next();
- doUpdatesIfNeeded
(sequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
+ doUpdatesIfNeeded
(outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr);
if
(isProportyDeletable(sequencePropertyBean.getName())) {
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName());
}
}
- SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
+ SandeshaUtil.stopSenderForTheSequence(internalSequenceID);
}
public static void addTerminateSequenceMessage(RMMsgContext
referenceMessage,
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=394865&r1=394864&r2=394865&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon
Apr 17 23:57:27 2006
@@ -124,7 +124,9 @@
MessageRetransmissionAdjuster
retransmitterAdjuster = new MessageRetransmissionAdjuster();
-
retransmitterAdjuster.adjustRetransmittion(senderBean, context);
+ boolean continueSending =
retransmitterAdjuster.adjustRetransmittion(senderBean, context);
+ if (!continueSending)
+ continue;
pickMessagesToSendTransaction.commit();
@@ -169,17 +171,7 @@
if (messageType ==
Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
String sequenceID =
sequence.getIdentifier().getIdentifier();
- // checking weather the sequence has
been timed out.
- boolean sequenceTimedOut =
SequenceManager.hasSequenceTimedOut(sequenceID, rmMsgCtx);
- if (sequenceTimedOut) {
- // sequence has been timed out.
- // do time out processing.
- // TODO uncomment below line
-
TerminateManager.timeOutSendingSideSequence(context,sequenceID,
msgCtx.isServerSide());
- String message = "Sequence
timed out";
- log.debug(message);
- throw new
SandeshaException(message);
- }
+
}
//checking weather this message can carry
piggybacked acks
@@ -235,7 +227,9 @@
TerminateSequence terminateSequence =
(TerminateSequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
ConfigurationContext configContext =
msgCtx.getConfigurationContext();
-
TerminateManager.terminateSendingSide(configContext,sequenceID,
msgCtx.isServerSide());
+
+ String internalSequenceID =
SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configContext);
+
TerminateManager.terminateSendingSide(configContext,internalSequenceID,
msgCtx.isServerSide());
}
terminateCleaningTransaction.commit();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]