Author: chamikara
Date: Tue Dec 27 05:06:25 2005
New Revision: 359208
URL: http://svn.apache.org/viewcvs?rev=359208&view=rev
Log:
Bug fixes.
Corrrected inactivity timeout logic.
Some changes to improve the preformance.
Corrections in the transaction logic.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
Tue Dec 27 05:06:25 2005
@@ -23,6 +23,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -41,7 +42,8 @@
public class AcknowledgementManager {
/**
- * Piggybacks any available acks of the same sequence to the given
application message.
+ * Piggybacks any available acks of the same sequence to the given
+ * application message.
*
* @param applicationRMMsgContext
* @throws SandeshaException
@@ -52,6 +54,7 @@
.getMessageContext().getConfigurationContext();
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(configurationContext);
+
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
@@ -68,7 +71,8 @@
String sequenceId = sequence.getIdentifier().getIdentifier();
SequencePropertyBean internalSequenceBean =
sequencePropertyBeanMgr
- .retrieve(sequenceId,
+ .retrieve(
+ sequenceId,
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean == null)
throw new SandeshaException("Temp Sequence is not set");
@@ -82,27 +86,32 @@
Iterator it = collection.iterator();
if (it.hasNext()) {
+
SenderBean ackBean = (SenderBean) it.next();
- //deleting the ack entry.
- retransmitterBeanMgr.delete(ackBean.getMessageID());
+ long timeNow = System.currentTimeMillis();
+ if (ackBean.getTimeToSend() > timeNow) { //Piggybacking
will happen only if the end of ack interval (timeToSend) is not reached.
- //Adding the ack to the application message
- MessageContext ackMsgContext = SandeshaUtil
-
.getStoredMessageContext(ackBean.getMessageContextRefKey());
- RMMsgContext ackRMMsgContext = MsgInitializer
- .initializeMessage(ackMsgContext);
- if (ackRMMsgContext.getMessageType() !=
Sandesha2Constants.MessageTypes.ACK)
- throw new SandeshaException("Invalid ack
message entry");
-
- SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) ackRMMsgContext
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
- applicationRMMsgContext.setMessagePart(
-
Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
- sequenceAcknowledgement);
+ //deleting the ack entry.
+
retransmitterBeanMgr.delete(ackBean.getMessageID());
- applicationRMMsgContext.addSOAPEnvelope();
- }
+ //Adding the ack to the application message
+ MessageContext ackMsgContext = SandeshaUtil
+ .getStoredMessageContext(ackBean
+
.getMessageContextRefKey());
+ RMMsgContext ackRMMsgContext = MsgInitializer
+
.initializeMessage(ackMsgContext);
+ if (ackRMMsgContext.getMessageType() !=
Sandesha2Constants.MessageTypes.ACK)
+ throw new SandeshaException("Invalid
ack message entry");
+
+ SequenceAcknowledgement sequenceAcknowledgement
= (SequenceAcknowledgement) ackRMMsgContext
+
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ applicationRMMsgContext.setMessagePart(
+
Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+ sequenceAcknowledgement);
+ applicationRMMsgContext.addSOAPEnvelope();
+ }
+ }
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
Tue Dec 27 05:06:25 2005
@@ -178,6 +178,8 @@
String OFFERED_SEQUENCE = "OfferedSequence";
String TERMINATE_ADDED = "TerminateAdded";
+
+ String LAST_ACTIVATED_TIME = "LastActivatedTime";
}
public interface SOAPVersion {
@@ -298,11 +300,11 @@
int INVOKER_SLEEP_TIME = 1000;
- int SENDER_SLEEP_TIME = 1000;
+ int SENDER_SLEEP_TIME = 500;
int CLIENT_SLEEP_TIME = 10000;
- int TERMINATE_DELAY = 1000;
+ int TERMINATE_DELAY = 100;
String TEMP_SEQUENCE_ID = "uuid:tempID";
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Tue
Dec 27 05:06:25 2005
@@ -18,8 +18,11 @@
package org.apache.sandesha2;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.modules.Module;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* The Module class of Sandesha2.
@@ -32,12 +35,21 @@
// initialize the module
public void init(AxisConfiguration axisSystem) throws AxisFault {
-
+ cleanStorage (axisSystem);
}
// shutdown the module
public void shutdown(AxisConfiguration axisSystem) throws AxisFault {
+ }
+
+ private void cleanStorage (AxisConfiguration axisSystem) throws
AxisFault {
+
+ ConfigurationContext configurationContext = new
ConfigurationContext (axisSystem);
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext);
+
+ storageManager.initStorage();
+
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Tue Dec 27 05:06:25 2005
@@ -85,12 +85,12 @@
*/
public static void terminateAfterInvocation (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
- SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
InvokerBeanMgr storageMapBeanMgr =
storageManager.getStorageMapBeanMgr();
//removing storageMap entries
InvokerBean findStorageMapBean = new InvokerBean ();
findStorageMapBean.setSequenceID(sequenceID);
+ findStorageMapBean.setInvoked(true);
Collection collection =
storageMapBeanMgr.find(findStorageMapBean);
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
@@ -98,6 +98,13 @@
storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
}
+ removeReceivingSideProperties(configContext,sequenceID);
+
+ }
+
+ private static void removeReceivingSideProperties (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean allSequenceBean =
sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
ArrayList allSequenceList =
SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
allSequenceList.remove(sequenceID);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Tue Dec 27 05:06:25 2005
@@ -218,9 +218,23 @@
acksTo = (String) msgCtx
.getProperty(Sandesha2ClientAPI.AcksTo);
}
+
+ if (msgCtx.isServerSide()) {
+ //we do not set acksTo value to
anonymous when the create sequence is send from the server.
+
+ MessageContext requestMessage =
operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMessage==null) {
+ throw new SandeshaException
("Request message is not present");
+ }
+
+ acksTo =
requestMessage.getTo().getAddress();
+
+ } else {
+ if (acksTo == null)
+ acksTo =
Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
+ }
+
- if (acksTo == null)
- acksTo =
Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
//If acksTo is not anonymous. Start the listner
TODO: verify
if
(!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Tue Dec 27 05:06:25 2005
@@ -36,6 +36,7 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Nack;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -80,6 +81,9 @@
if (outSequenceId == null || "".equals(outSequenceId))
throw new SandeshaException("OutSequenceId is null");
+ //updating the last activated time of the sequence.
+
SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
+
SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
outSequenceId,
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
@@ -154,8 +158,6 @@
addTerminateSequenceMessage(rmMsgCtx,
outSequenceId,
internalSequenceId);
}
-
-
}
//stopping the progress of the message further.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Tue Dec 27 05:06:25 2005
@@ -53,6 +53,7 @@
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.LastMessage;
import org.apache.sandesha2.wsrm.Sequence;
@@ -83,23 +84,28 @@
if (msgCtx == null)
throw new SandeshaException("Message context is null");
- if
(rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
- &&
rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE)
- .equals("true")) {
+ if (rmMsgCtx
+
.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+ && rmMsgCtx.getProperty(
+
Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals(
+ "true")) {
return;
}
- //RM will not rend sync responses. If sync acks are there this
will be made true again later.
- if(rmMsgCtx.getMessageContext().getOperationContext()!=null) {
-
rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,Constants.VALUE_FALSE);
+ //RM will not rend sync responses. If sync acks are there this
will be
+ // made true again later.
+ if (rmMsgCtx.getMessageContext().getOperationContext() != null)
{
+
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+ Constants.RESPONSE_WRITTEN,
Constants.VALUE_FALSE);
}
-
+
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
.getConfigurationContext());
-
- Transaction applicationMsgTransaction =
storageManager.getTransaction();
-
+
+ Transaction updataMsgStringTransaction = storageManager
+ .getTransaction();
+
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
@@ -112,6 +118,9 @@
if (configCtx == null)
throw new SandeshaException("Configuration Context is
null");
+ //updating the last activated time of the sequence.
+ SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
+
SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
@@ -127,7 +136,8 @@
// EXACTLY_ONCE.
//msgCtx.pause();
- rmMsgCtx.getMessageContext().setPausedTrue(new QName
(Sandesha2Constants.IN_HANDLER_NAME));
+ rmMsgCtx.getMessageContext().setPausedTrue(
+ new
QName(Sandesha2Constants.IN_HANDLER_NAME));
}
@@ -139,7 +149,11 @@
msgsBean.setValue(messagesStr);
seqPropMgr.update(msgsBean);
- sendAckIfNeeded(rmMsgCtx, messagesStr);
+ updataMsgStringTransaction.commit();
+
+
+
+ Transaction invokeTransaction = storageManager.getTransaction();
// Pause the messages bean if not the right message to
invoke.
NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
@@ -152,82 +166,71 @@
long nextMsgno = bean.getNextMsgNoToProcess();
- if (msgCtx.isServerSide()) {
- boolean inOrderInvocation =
PropertyManager.getInstance().isInOrderInvocation();
- if (inOrderInvocation) {
- //pause the message
- //msgCtx.pause();
- rmMsgCtx.getMessageContext().setPausedTrue(new
QName (Sandesha2Constants.IN_HANDLER_NAME));
- SequencePropertyBean incomingSequenceListBean =
(SequencePropertyBean) seqPropMgr
- .retrieve(
-
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- ArrayList incomingSequenceList = new
ArrayList();
- incomingSequenceListBean = new
SequencePropertyBean();
- incomingSequenceListBean
-
.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
- incomingSequenceListBean
-
.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
incomingSequenceListBean.setValue(incomingSequenceList.toString());
+ boolean inOrderInvocation = PropertyManager.getInstance()
+ .isInOrderInvocation();
+ if (inOrderInvocation) {
+ //pause the message
+ //msgCtx.pause();
+ rmMsgCtx.getMessageContext().setPausedTrue(
+ new
QName(Sandesha2Constants.IN_HANDLER_NAME));
+ SequencePropertyBean incomingSequenceListBean =
(SequencePropertyBean) seqPropMgr
+ .retrieve(
+
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ ArrayList incomingSequenceList = new
ArrayList();
+ incomingSequenceListBean = new
SequencePropertyBean();
+ incomingSequenceListBean
+
.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+ incomingSequenceListBean
+
.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
incomingSequenceListBean.setValue(incomingSequenceList
+ .toString());
-
seqPropMgr.insert(incomingSequenceListBean);
- }
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
- ArrayList incomingSequenceList =
SandeshaUtil.getArrayListFromString(incomingSequenceListBean
- .getValue());
+ ArrayList incomingSequenceList = SandeshaUtil
+
.getArrayListFromString(incomingSequenceListBean.getValue());
- //Adding current sequence to the incoming
sequence List.
- if (!incomingSequenceList.contains(sequenceId))
{
- incomingSequenceList.add(sequenceId);
-
- //saving the property.
-
incomingSequenceListBean.setValue(incomingSequenceList.toString());
-
seqPropMgr.insert(incomingSequenceListBean);
- }
+ //Adding current sequence to the incoming sequence List.
+ if (!incomingSequenceList.contains(sequenceId)) {
+ incomingSequenceList.add(sequenceId);
+
+ //saving the property.
+
incomingSequenceListBean.setValue(incomingSequenceList
+ .toString());
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
- //saving the message.
- try {
- String key =
SandeshaUtil.storeMessageContext(rmMsgCtx
- .getMessageContext());
- storageMapMgr.insert(new
InvokerBean(key, msgNo,
- sequenceId));
-
- //This will avoid performing
application processing more
- // than
- // once.
-
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
- "true");
+ //saving the message.
+ try {
+ String key =
SandeshaUtil.storeMessageContext(rmMsgCtx
+ .getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key,
msgNo, sequenceId));
+
+ //This will avoid performing application
processing more
+ // than
+ // once.
+ rmMsgCtx.setProperty(
+
Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
- } catch (Exception ex) {
- throw new
SandeshaException(ex.getMessage());
- }
+ } catch (Exception ex) {
+ throw new SandeshaException(ex.getMessage());
+ }
- //Starting the invoker if stopped.
-
SandeshaUtil.startInvokerIfStopped(msgCtx.getConfigurationContext());
+ //Starting the invoker if stopped.
+ SandeshaUtil
+
.startInvokerIfStopped(msgCtx.getConfigurationContext());
- }
}
-// try {
-// MessageContext requestMessage =
rmMsgCtx.getMessageContext()
-//
.getOperationContext().getMessageContext(
-//
WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-// String requestMessageId = requestMessage.getMessageID();
-// SequencePropertyBean checkResponseBean =
seqPropMgr.retrieve(
-// requestMessageId,
-//
Sandesha2Constants.SequenceProperties.CHECK_RESPONSE);
-// if (checkResponseBean != null) {
-// checkResponseBean.setValue(msgCtx);
-// seqPropMgr.update(checkResponseBean);
-// }
-//
-// } catch (AxisFault e) {
-// throw new SandeshaException(e.getMessage());
-// }
-
- applicationMsgTransaction.commit();
+ invokeTransaction.commit();
+
+ //Sending acknowledgements
+ sendAckIfNeeded(rmMsgCtx, messagesStr);
+
}
//TODO convert following from INT to LONG
@@ -272,7 +275,7 @@
SequencePropertyBean acksToBean =
seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
- EndpointReference acksTo = new EndpointReference
(acksToBean.getValue());
+ EndpointReference acksTo = new
EndpointReference(acksToBean.getValue());
String acksToStr = acksTo.getAddress();
if (acksToStr == null || messagesStr == null)
@@ -311,9 +314,10 @@
MessageContext ackMsgCtx =
SandeshaUtil.createNewRelatedMessageContext(
rmMsgCtx, ackOperation);
-
-
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-
+
+
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
+ "true");
+
RMMsgContext ackRMMsgCtx =
MsgInitializer.initializeMessage(ackMsgCtx);
ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
@@ -351,10 +355,11 @@
}
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-
org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+
org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
-
rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN,
- "true");
+ rmMsgCtx.getMessageContext().setProperty(
+ Sandesha2Constants.ACK_WRITTEN, "true");
try {
engine.send(ackRMMsgCtx.getMessageContext());
} catch (AxisFault e1) {
@@ -362,6 +367,8 @@
}
} else {
+ Transaction asyncAckTransaction =
storageManager.getTransaction();
+
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
@@ -380,15 +387,15 @@
RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
- long ackInterval =
PropertyManager.getInstance().getAcknowledgementInterval();
+ long ackInterval = PropertyManager.getInstance()
+ .getAcknowledgementInterval();
if (policyBean != null) {
ackInterval =
policyBean.getAcknowledgementInaterval();
}
-
+
//Ack will be sent as stand alone, only after the
retransmitter
// interval.
long timeToSend = System.currentTimeMillis() +
ackInterval;
- ackBean.setTimeToSend(timeToSend);
//removing old acks.
SenderBean findBean = new SenderBean();
@@ -398,14 +405,19 @@
findBean.setReSend(false);
Collection coll = retransmitterBeanMgr.find(findBean);
Iterator it = coll.iterator();
- while (it.hasNext()) {
- SenderBean retransmitterBean = (SenderBean) it
- .next();
-
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ timeToSend = oldAckBean.getTimeToSend();
//If there is an old ack. This ack will be sent in the old timeToSend.
+
retransmitterBeanMgr.delete(oldAckBean.getMessageID());
}
+
+ ackBean.setTimeToSend(timeToSend);
//inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
+
+ asyncAckTransaction.commit();
SandeshaUtil.startSenderIfStopped(configCtx);
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Tue Dec 27 05:06:25 2005
@@ -33,6 +33,7 @@
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -215,6 +216,8 @@
retransmitterMgr.update(tempBean);
}
+
SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+
updateAppMessagesTransaction.commit();
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Tue Dec 27 05:06:25 2005
@@ -28,6 +28,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -70,6 +71,8 @@
TerminateManager.terminateReceivingSide(context,sequenceId);
terminateTransaction.commit();
+
+ SequenceManager.updateLastActivatedTime(sequenceId,context);
//terminateSeqMsg.pause();
terminateSeqRMMSg.getMessageContext().setPausedTrue(new QName
(Sandesha2Constants.IN_HANDLER_NAME));
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
Tue Dec 27 05:06:25 2005
@@ -47,6 +47,8 @@
if (context != null)
this.context = context;
}
+
+ public abstract void initStorage ();
public abstract Transaction getTransaction();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
Tue Dec 27 05:06:25 2005
@@ -79,6 +79,9 @@
if (bean.getSequenceID() != null
&&
!bean.getSequenceID().equals(temp.getSequenceID()))
select = false;
+
+ if (bean.isInvoked()!=temp.isInvoked())
+ select = false;
if (select)
beans.add(temp);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
Tue Dec 27 05:06:25 2005
@@ -16,17 +16,13 @@
*/
package org.apache.sandesha2.storage.inmemory;
-import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import org.apache.axis2.context.AbstractContext;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.RetransmitterBeanMgrTest;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Tue Dec 27 05:06:25 2005
@@ -74,4 +74,8 @@
return instance;
}
+
+ public void initStorage () {
+
+ }
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
Tue Dec 27 05:06:25 2005
@@ -82,7 +82,13 @@
baseInterval);
}
- retransmitterBean.setTimeToSend(lastSentTime + newInterval);
+ long newTimeToSend = 0;
+ //newTimeToSend = lastSentTime + newInterval;
+
+ long timeNow = System.currentTimeMillis();
+ newTimeToSend = timeNow + newInterval;
+
+ retransmitterBean.setTimeToSend(newTimeToSend);
return retransmitterBean;
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
Tue Dec 27 05:06:25 2005
@@ -9,12 +9,15 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.NextMsgBean;
@@ -97,6 +100,8 @@
// message to invoke
//this will apply for only in-order invocations.
+
updateLastActivatedTime(sequenceId,createSequenceMsg.getMessageContext().getConfigurationContext());
+
return sequenceId;
}
@@ -140,4 +145,71 @@
seqPropMgr.insert(acksToBean);
}
+
+ public static void updateLastActivatedTime (String sequenceID,
ConfigurationContext configContext) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ Transaction lastActivatedTransaction =
storageManager.getTransaction();
+ SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean lastActivatedBean =
sequencePropertyBeanMgr.retrieve(sequenceID,
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ boolean added = false;
+
+ if (lastActivatedBean==null) {
+ added = true;
+ lastActivatedBean = new SequencePropertyBean ();
+ lastActivatedBean.setSequenceID(sequenceID);
+
lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ }
+
+ long currentTime = System.currentTimeMillis();
+ lastActivatedBean.setValue(Long.toString(currentTime));
+
+ if (added)
+ sequencePropertyBeanMgr.insert(lastActivatedBean);
+ else
+ sequencePropertyBeanMgr.update(lastActivatedBean);
+
+ lastActivatedTransaction.commit();
+ }
+
+ public static long getLastActivatedTime (String sequenceID,
ConfigurationContext configContext) throws SandeshaException {
+
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ long lastActivatedTime = -1;
+
+ if (lastActivatedBean!=null) {
+ lastActivatedTime =
Long.parseLong(lastActivatedBean.getValue());
+ }
+
+ return lastActivatedTime;
+ }
+
+ public static boolean hasSequenceTimedOut (String sequenceID,
RMMsgContext rmMsgCtx) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+ SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropretyBeanMgr();
+
+ RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+ .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+ if (policyBean == null) {
+ //loading default policies.
+ policyBean =
PropertyManager.getInstance().getRMPolicyBean();
+ }
+
+ boolean sequenceTimedOut = false;
+
+ SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ if (lastActivatedBean!=null) {
+ long lastActivatedTime =
Long.parseLong(lastActivatedBean.getValue());
+ long timeNow = System.currentTimeMillis();
+ if
(lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow)
+ sequenceTimedOut = true;
+ }
+
+ return sequenceTimedOut;
+ }
+
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
Tue Dec 27 05:06:25 2005
@@ -182,6 +182,7 @@
Sequence sequence =
(Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if
(sequence.getLastMessage() != null) {
+
TerminateManager.terminateAfterInvocation(
context, sequenceId);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Tue
Dec 27 05:06:25 2005
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Iterator;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -25,9 +26,9 @@
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
@@ -37,11 +38,14 @@
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
/**
- * This is responsible for sending and re-sending messages of Sandesha2. This
represent a thread that keep running all
- * the time. This keep looking at the Sender table to find out any entries
that should be sent.
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
*
* @author Chamikara Jayalath <[EMAIL PROTECTED]>
*/
@@ -63,29 +67,32 @@
public void run() {
StorageManager storageManager = null;
-
+
try {
- storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
+ storageManager =
SandeshaUtil.getSandeshaStorageManager(context);
} catch (SandeshaException e2) {
// TODO Auto-generated catch block
System.out.println("ERROR: Could not start sender");
e2.printStackTrace();
return;
}
-
+
while (senderStarted) {
try {
if (context == null)
throw new SandeshaException(
"Can't continue the
Sender. Context is null");
+ Transaction pickMessagesToSendTransaction =
storageManager.getTransaction(); //starting
+
// a
+
//
new
+
//
transaction
- Transaction sendTransaction =
storageManager.getTransaction(); //starting a new transaction
-
- SenderBeanMgr mgr = storageManager
- .getRetransmitterBeanMgr();
+ SenderBeanMgr mgr =
storageManager.getRetransmitterBeanMgr();
Collection coll = mgr.findMsgsToSend();
+
+ pickMessagesToSendTransaction.commit();
+
Iterator iter = coll.iterator();
while (iter.hasNext()) {
@@ -96,9 +103,10 @@
.getStoredMessageContext(key);
try {
-
- if (msgCtx==null) {
-
System.out.println("ERROR: Sender has an Unavailable Message entry");
+
+ if (msgCtx == null) {
+ System.out
+
.println("ERROR: Sender has an Unavailable Message entry");
break;
}
RMMsgContext rmMsgCtx =
MsgInitializer
@@ -121,57 +129,100 @@
+ "' message.");
}
}
+
+ Transaction preSendTransaction
= storageManager.getTransaction();
- if (rmMsgCtx.getMessageType()
== Sandesha2Constants.MessageTypes.APPLICATION) {
+ int messageType =
rmMsgCtx.getMessageType();
+
+ if (messageType ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+
+ Sequence sequence =
(Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceID =
sequence.getIdentifier().getIdentifier();
+ //checking weather the
sequence has been timed out.
+ boolean
sequenceTimedOut = SequenceManager.hasSequenceTimedOut (sequenceID, rmMsgCtx);;
+ if (sequenceTimedOut) {
+ //sequence has
been timed out.
+ //do time out
processing.
+
+
TerminateManager.terminateSendingSide(context,sequenceID);
+ throw new
SandeshaException ("Sequence timed out");
+ }
+
//piggybacking if an
ack if available for the same
// sequence.
AcknowledgementManager
.piggybackAckIfPresent(rmMsgCtx);
+
}
+
+ preSendTransaction.commit();
try {
- AxisEngine engine = new
AxisEngine (msgCtx.getConfigurationContext());
- engine.send(msgCtx);
-// if (msgCtx.isPaused())
-//
engine.resumeSend(msgCtx);
-// else
-//
engine.send(msgCtx);
+ AxisEngine engine = new
AxisEngine(msgCtx
+
.getConfigurationContext());
+ engine.send(msgCtx);
+ //
if (msgCtx.isPaused())
+ //
engine.resumeSend(msgCtx);
+ //
else
+ //
engine.send(msgCtx);
+
} catch (Exception e) {
//Exception is sending.
retry later
System.out
.println("Exception thrown in sending...");
e.printStackTrace();
+ //e.printStackTrace();
+
}
+
+ Transaction postSendTransaction
= storageManager.getTransaction();
MessageRetransmissionAdjuster
retransmitterAdjuster = new MessageRetransmissionAdjuster();
+
+ if (rmMsgCtx.getMessageType()
== Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence =
(Sequence) rmMsgCtx
+
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ long messageNo =
sequence.getMessageNumber()
+
.getMessageNumber();
+ }
+
retransmitterAdjuster.adjustRetransmittion(bean);
-// mgr.update(bean);
-
- if (bean.isReSend())
- mgr.update(bean);
- else
-
mgr.delete(bean.getMessageID());
-
- sendTransaction.commit();
//commiting the current transaction
+ //update or delete only if the
object is still present.
+ SenderBean bean1 =
mgr.retrieve(bean.getMessageID());
+ if (bean1 != null) {
+ if (bean.isReSend())
+
mgr.update(bean);
+ else
+
mgr.delete(bean.getMessageID());
+ }
+
+ postSendTransaction.commit();
//commiting the current
+
// transaction
- Transaction
processResponseTransaction = storageManager.getTransaction();
+ Transaction
processResponseTransaction =
+ storageManager.getTransaction();
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
-
+
processResponseTransaction.commit();
-
- Transaction
terminateCleaningTransaction = storageManager.getTransaction();
- if
(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+
+ Transaction
terminateCleaningTransaction = storageManager
+
.getTransaction();
+ if (rmMsgCtx.getMessageType()
== Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
//terminate sending
side.
- TerminateSequence
terminateSequence = (TerminateSequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
- ConfigurationContext
configContext = msgCtx.getConfigurationContext();
-
-
TerminateManager.terminateSendingSide(configContext,sequenceID);
+ TerminateSequence
terminateSequence = (TerminateSequence) rmMsgCtx
+
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID =
terminateSequence
+
.getIdentifier().getIdentifier();
+ ConfigurationContext
configContext = msgCtx
+
.getConfigurationContext();
+
+
TerminateManager.terminateSendingSide(
+
configContext, sequenceID);
}
-
+
terminateCleaningTransaction.commit();
} catch (AxisFault e1) {
@@ -179,30 +230,15 @@
} catch (Exception e3) {
e3.printStackTrace();
}
-
- //changing the values of the sent bean.
-
//bean.setLastSentTime(System.currentTimeMillis());
- //bean.setSentCount(bean.getSentCount()
+ 1);
-
- //update if resend=true otherwise
delete. (reSend=false
- // means
- // send only once).
-// if (bean.isReSend())
-// mgr.update(bean);
-// else
-// mgr.delete(bean.getMessageID());
-
}
-
-
-
+
} catch (SandeshaException e) {
e.printStackTrace();
return;
}
try {
- Thread.sleep(2000);
+
Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
} catch (InterruptedException e1) {
//e1.printStackTrace();
System.out.println("Sender was interupted...");
@@ -248,58 +284,61 @@
}
- private void checkForSyncResponses(MessageContext msgCtx) {
+ private void checkForSyncResponses(MessageContext msgCtx) {
try {
- boolean responsePresent = (msgCtx
- .getProperty(MessageContext.TRANSPORT_IN) !=
null);
+ boolean responsePresent = (msgCtx
+
.getProperty(MessageContext.TRANSPORT_IN) != null);
- if (responsePresent) {
- //create the response
- MessageContext response = new MessageContext(msgCtx
- .getConfigurationContext(),
msgCtx.getSessionContext(), msgCtx
- .getTransportIn(),
msgCtx.getTransportOut());
- response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
-
.getProperty(MessageContext.TRANSPORT_IN));
-
- response.setServerSide(false);
-
- //If request is REST we assume the response is REST, so
set the
- // variable
- response.setDoingREST(msgCtx.isDoingREST());
- response
-
.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
-
response.setServiceGroupContext(msgCtx.getServiceGroupContext());
- response.setServiceContext(msgCtx.getServiceContext());
- response.setAxisService(msgCtx.getAxisService());
-
response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
-
- //setting the in-flow.
- //ArrayList inPhaseHandlers =
- //
response.getAxisOperation().getRemainingPhasesInFlow();
- /*
- * if (inPhaseHandlers==null ||
inPhaseHandlers.isEmpty()) {
- * ArrayList phases =
- *
msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
- *
response.getAxisOperation().setRemainingPhasesInFlow(phases); }
- */
-
- //Changed following from TransportUtils to SandeshaUtil
since op.
- // context is anavailable.
- SOAPEnvelope resenvelope = null;
- resenvelope = SandeshaUtil.createSOAPMessage(response,
msgCtx
-
.getEnvelope().getNamespace().getName());
-
-
- if (resenvelope != null) {
- AxisEngine engine = new
AxisEngine(msgCtx.getConfigurationContext());
- response.setEnvelope(resenvelope);
- engine.receive(response);
+ if (responsePresent) {
+ //create the response
+ MessageContext response = new
MessageContext(msgCtx
+ .getConfigurationContext(),
msgCtx.getSessionContext(),
+ msgCtx.getTransportIn(),
msgCtx.getTransportOut());
+
response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+
.getProperty(MessageContext.TRANSPORT_IN));
+
+ response.setServerSide(false);
+
+ //If request is REST we assume the response is
REST, so set the
+ // variable
+ response.setDoingREST(msgCtx.isDoingREST());
+ response.setServiceGroupContextId(msgCtx
+ .getServiceGroupContextId());
+ response
+
.setServiceGroupContext(msgCtx.getServiceGroupContext());
+
response.setServiceContext(msgCtx.getServiceContext());
+
response.setAxisService(msgCtx.getAxisService());
+
response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
+
+ //setting the in-flow.
+ //ArrayList inPhaseHandlers =
+ //
response.getAxisOperation().getRemainingPhasesInFlow();
+ /*
+ * if (inPhaseHandlers==null ||
inPhaseHandlers.isEmpty()) {
+ * ArrayList phases =
+ *
msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
+ *
response.getAxisOperation().setRemainingPhasesInFlow(phases); }
+ */
+
+ //Changed following from TransportUtils to
SandeshaUtil since
+ // op.
+ // context is anavailable.
+ SOAPEnvelope resenvelope = null;
+ resenvelope =
SandeshaUtil.createSOAPMessage(response, msgCtx
+
.getEnvelope().getNamespace().getName());
+
+ if (resenvelope != null) {
+ AxisEngine engine = new
AxisEngine(msgCtx
+
.getConfigurationContext());
+ response.setEnvelope(resenvelope);
+ engine.receive(response);
+ }
}
- }
-
- }catch (Exception e) {
- System.out.println("Exception was throws in processing
the sync response...");
+
+ } catch (Exception e) {
+ System.out
+ .println("Exception was throws in
processing the sync response...");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]