Author: chamikara
Date: Tue Dec 27 05:06:25 2005
New Revision: 359208

URL: http://svn.apache.org/viewcvs?rev=359208&view=rev
Log:
Bug fixes.
Corrrected inactivity timeout logic.
Some changes to improve the preformance.
Corrections in the transaction logic.

Modified:
    
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java 
Tue Dec 27 05:06:25 2005
@@ -23,6 +23,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -41,7 +42,8 @@
 public class AcknowledgementManager {
 
        /**
-        * Piggybacks any available acks of the same sequence to the given 
application message.
+        * Piggybacks any available acks of the same sequence to the given
+        * application message.
         * 
         * @param applicationRMMsgContext
         * @throws SandeshaException
@@ -52,6 +54,7 @@
                                .getMessageContext().getConfigurationContext();
                StorageManager storageManager = SandeshaUtil
                                
.getSandeshaStorageManager(configurationContext);
+
                SenderBeanMgr retransmitterBeanMgr = storageManager
                                .getRetransmitterBeanMgr();
                SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
@@ -68,7 +71,8 @@
                String sequenceId = sequence.getIdentifier().getIdentifier();
 
                SequencePropertyBean internalSequenceBean = 
sequencePropertyBeanMgr
-                               .retrieve(sequenceId,
+                               .retrieve(
+                                               sequenceId,
                                                
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
                if (internalSequenceBean == null)
                        throw new SandeshaException("Temp Sequence is not set");
@@ -82,27 +86,32 @@
                Iterator it = collection.iterator();
 
                if (it.hasNext()) {
+
                        SenderBean ackBean = (SenderBean) it.next();
 
-                       //deleting the ack entry.
-                       retransmitterBeanMgr.delete(ackBean.getMessageID());
+                       long timeNow = System.currentTimeMillis();
+                       if (ackBean.getTimeToSend() > timeNow) { //Piggybacking 
will happen only if the end of ack interval (timeToSend) is not reached.
 
-                       //Adding the ack to the application message
-                       MessageContext ackMsgContext = SandeshaUtil
-                                       
.getStoredMessageContext(ackBean.getMessageContextRefKey());
-                       RMMsgContext ackRMMsgContext = MsgInitializer
-                                       .initializeMessage(ackMsgContext);
-                       if (ackRMMsgContext.getMessageType() != 
Sandesha2Constants.MessageTypes.ACK)
-                               throw new SandeshaException("Invalid ack 
message entry");
-
-                       SequenceAcknowledgement sequenceAcknowledgement = 
(SequenceAcknowledgement) ackRMMsgContext
-                                       
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-                       applicationRMMsgContext.setMessagePart(
-                                       
Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
-                                       sequenceAcknowledgement);
+                               //deleting the ack entry.
+                               
retransmitterBeanMgr.delete(ackBean.getMessageID());
 
-                       applicationRMMsgContext.addSOAPEnvelope();
-               }
+                               //Adding the ack to the application message
+                               MessageContext ackMsgContext = SandeshaUtil
+                                               .getStoredMessageContext(ackBean
+                                                               
.getMessageContextRefKey());
+                               RMMsgContext ackRMMsgContext = MsgInitializer
+                                               
.initializeMessage(ackMsgContext);
+                               if (ackRMMsgContext.getMessageType() != 
Sandesha2Constants.MessageTypes.ACK)
+                                       throw new SandeshaException("Invalid 
ack message entry");
+
+                               SequenceAcknowledgement sequenceAcknowledgement 
= (SequenceAcknowledgement) ackRMMsgContext
+                                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+                               applicationRMMsgContext.setMessagePart(
+                                               
Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+                                               sequenceAcknowledgement);
 
+                               applicationRMMsgContext.addSOAPEnvelope();
+                       }
+               }
        }
 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java 
Tue Dec 27 05:06:25 2005
@@ -178,6 +178,8 @@
                String OFFERED_SEQUENCE = "OfferedSequence";
 
                String TERMINATE_ADDED = "TerminateAdded";
+               
+               String LAST_ACTIVATED_TIME = "LastActivatedTime";
        }
 
        public interface SOAPVersion {
@@ -298,11 +300,11 @@
 
        int INVOKER_SLEEP_TIME = 1000;
 
-       int SENDER_SLEEP_TIME = 1000;
+       int SENDER_SLEEP_TIME = 500;
 
        int CLIENT_SLEEP_TIME = 10000;
 
-       int TERMINATE_DELAY = 1000;
+       int TERMINATE_DELAY = 100;
 
        String TEMP_SEQUENCE_ID = "uuid:tempID";
 

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Tue 
Dec 27 05:06:25 2005
@@ -18,8 +18,11 @@
 package org.apache.sandesha2;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.modules.Module;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
 
 /**
  * The Module class of Sandesha2.
@@ -32,12 +35,21 @@
 
        // initialize the module
        public void init(AxisConfiguration axisSystem) throws AxisFault {
-
+               cleanStorage (axisSystem);
        }
 
        // shutdown the module
        public void shutdown(AxisConfiguration axisSystem) throws AxisFault {
 
+       }
+       
+       private void cleanStorage (AxisConfiguration axisSystem) throws 
AxisFault {
+               
+               ConfigurationContext configurationContext = new 
ConfigurationContext (axisSystem);
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext);
+               
+               storageManager.initStorage();
+               
        }
        
 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java 
Tue Dec 27 05:06:25 2005
@@ -85,12 +85,12 @@
         */
        public static void terminateAfterInvocation (ConfigurationContext 
configContext, String sequenceID) throws SandeshaException {
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);
-               SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
                InvokerBeanMgr storageMapBeanMgr = 
storageManager.getStorageMapBeanMgr();
 
                //removing storageMap entries
                InvokerBean findStorageMapBean = new InvokerBean ();
                findStorageMapBean.setSequenceID(sequenceID);
+               findStorageMapBean.setInvoked(true);
                Collection collection = 
storageMapBeanMgr.find(findStorageMapBean);
                Iterator iterator = collection.iterator();
                while (iterator.hasNext()) {
@@ -98,6 +98,13 @@
                        
storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
                }
                
+               removeReceivingSideProperties(configContext,sequenceID);
+
+       }
+       
+       private static void removeReceivingSideProperties (ConfigurationContext 
configContext, String sequenceID) throws SandeshaException {
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);
+               SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
                SequencePropertyBean allSequenceBean = 
sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
                ArrayList allSequenceList = 
SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
                allSequenceList.remove(sequenceID);

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 Tue Dec 27 05:06:25 2005
@@ -218,9 +218,23 @@
                                        acksTo = (String) msgCtx
                                                        
.getProperty(Sandesha2ClientAPI.AcksTo);
                                }
+                               
+                               if (msgCtx.isServerSide()) {
+                                       //we do not set acksTo value to 
anonymous when the create sequence is send from the server.
+                                       
+                                       MessageContext requestMessage = 
operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+                                       if (requestMessage==null) {
+                                               throw new SandeshaException 
("Request message is not present");
+                                       }
+                                       
+                                       acksTo = 
requestMessage.getTo().getAddress();
+                                       
+                               } else {
+                                       if (acksTo == null)
+                                               acksTo = 
Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
+                               }
+
 
-                               if (acksTo == null)
-                                       acksTo = 
Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
 
                                //If acksTo is not anonymous. Start the listner 
TODO: verify
                                if 
(!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Tue Dec 27 05:06:25 2005
@@ -36,6 +36,7 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Nack;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -80,6 +81,9 @@
                if (outSequenceId == null || "".equals(outSequenceId))
                        throw new SandeshaException("OutSequenceId is null");
 
+               //updating the last activated time of the sequence.
+               
SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
+               
                SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
                                outSequenceId, 
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
 
@@ -154,8 +158,6 @@
                                addTerminateSequenceMessage(rmMsgCtx, 
outSequenceId,
                                                internalSequenceId);
                        }
-
-
                }
                
                //stopping the progress of the message further.

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Tue Dec 27 05:06:25 2005
@@ -53,6 +53,7 @@
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.LastMessage;
 import org.apache.sandesha2.wsrm.Sequence;
@@ -83,23 +84,28 @@
                if (msgCtx == null)
                        throw new SandeshaException("Message context is null");
 
-               if 
(rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
-                               && 
rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE)
-                                               .equals("true")) {
+               if (rmMsgCtx
+                               
.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+                               && rmMsgCtx.getProperty(
+                                               
Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals(
+                                               "true")) {
                        return;
                }
 
-               //RM will not rend sync responses. If sync acks are there this 
will be made true again later.
-               if(rmMsgCtx.getMessageContext().getOperationContext()!=null) {
-                       
rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,Constants.VALUE_FALSE);
+               //RM will not rend sync responses. If sync acks are there this 
will be
+               // made true again later.
+               if (rmMsgCtx.getMessageContext().getOperationContext() != null) 
{
+                       
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+                                       Constants.RESPONSE_WRITTEN, 
Constants.VALUE_FALSE);
                }
-               
+
                StorageManager storageManager = SandeshaUtil
                                
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
                                                .getConfigurationContext());
-               
-               Transaction applicationMsgTransaction = 
storageManager.getTransaction();
-               
+
+               Transaction updataMsgStringTransaction = storageManager
+                               .getTransaction();
+
                SequencePropertyBeanMgr seqPropMgr = storageManager
                                .getSequencePropretyBeanMgr();
 
@@ -112,6 +118,9 @@
                if (configCtx == null)
                        throw new SandeshaException("Configuration Context is 
null");
 
+               //updating the last activated time of the sequence.
+               SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
+               
                SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
                                
Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
 
@@ -127,7 +136,8 @@
                        // EXACTLY_ONCE.
 
                        //msgCtx.pause();
-                       rmMsgCtx.getMessageContext().setPausedTrue(new QName 
(Sandesha2Constants.IN_HANDLER_NAME));
+                       rmMsgCtx.getMessageContext().setPausedTrue(
+                                       new 
QName(Sandesha2Constants.IN_HANDLER_NAME));
 
                }
 
@@ -139,7 +149,11 @@
                msgsBean.setValue(messagesStr);
                seqPropMgr.update(msgsBean);
 
-               sendAckIfNeeded(rmMsgCtx, messagesStr);
+               updataMsgStringTransaction.commit();
+
+
+
+               Transaction invokeTransaction = storageManager.getTransaction();
 
                //      Pause the messages bean if not the right message to 
invoke.
                NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
@@ -152,82 +166,71 @@
 
                long nextMsgno = bean.getNextMsgNoToProcess();
 
-               if (msgCtx.isServerSide()) {
-                       boolean inOrderInvocation = 
PropertyManager.getInstance().isInOrderInvocation();
-                       if (inOrderInvocation) {
-                               //pause the message
-                               //msgCtx.pause();
-                               rmMsgCtx.getMessageContext().setPausedTrue(new 
QName (Sandesha2Constants.IN_HANDLER_NAME));
-                               SequencePropertyBean incomingSequenceListBean = 
(SequencePropertyBean) seqPropMgr
-                                               .retrieve(
-                                                               
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-                                                               
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
-                               if (incomingSequenceListBean == null) {
-                                       ArrayList incomingSequenceList = new 
ArrayList();
-                                       incomingSequenceListBean = new 
SequencePropertyBean();
-                                       incomingSequenceListBean
-                                                       
.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
-                                       incomingSequenceListBean
-                                                       
.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-                                       
incomingSequenceListBean.setValue(incomingSequenceList.toString());
+               boolean inOrderInvocation = PropertyManager.getInstance()
+                               .isInOrderInvocation();
+               if (inOrderInvocation) {
+                       //pause the message
+                       //msgCtx.pause();
+                       rmMsgCtx.getMessageContext().setPausedTrue(
+                                       new 
QName(Sandesha2Constants.IN_HANDLER_NAME));
+                       SequencePropertyBean incomingSequenceListBean = 
(SequencePropertyBean) seqPropMgr
+                                       .retrieve(
+                                                       
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+                                                       
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+                       if (incomingSequenceListBean == null) {
+                               ArrayList incomingSequenceList = new 
ArrayList();
+                               incomingSequenceListBean = new 
SequencePropertyBean();
+                               incomingSequenceListBean
+                                               
.setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+                               incomingSequenceListBean
+                                               
.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+                               
incomingSequenceListBean.setValue(incomingSequenceList
+                                               .toString());
 
-                                       
seqPropMgr.insert(incomingSequenceListBean);
-                               }
+                               seqPropMgr.insert(incomingSequenceListBean);
+                       }
 
-                               ArrayList incomingSequenceList = 
SandeshaUtil.getArrayListFromString(incomingSequenceListBean
-                                               .getValue());
+                       ArrayList incomingSequenceList = SandeshaUtil
+                                       
.getArrayListFromString(incomingSequenceListBean.getValue());
 
-                               //Adding current sequence to the incoming 
sequence List.
-                               if (!incomingSequenceList.contains(sequenceId)) 
{
-                                       incomingSequenceList.add(sequenceId);
-                                       
-                                       //saving the property.
-                                       
incomingSequenceListBean.setValue(incomingSequenceList.toString());
-                                       
seqPropMgr.insert(incomingSequenceListBean);
-                               }
+                       //Adding current sequence to the incoming sequence List.
+                       if (!incomingSequenceList.contains(sequenceId)) {
+                               incomingSequenceList.add(sequenceId);
+
+                               //saving the property.
+                               
incomingSequenceListBean.setValue(incomingSequenceList
+                                               .toString());
+                               seqPropMgr.insert(incomingSequenceListBean);
+                       }
 
-                               //saving the message.
-                               try {
-                                       String key = 
SandeshaUtil.storeMessageContext(rmMsgCtx
-                                                       .getMessageContext());
-                                       storageMapMgr.insert(new 
InvokerBean(key, msgNo,
-                                                       sequenceId));
-
-                                       //This will avoid performing 
application processing more
-                                       // than
-                                       // once.
-                                       
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
-                                                       "true");
+                       //saving the message.
+                       try {
+                               String key = 
SandeshaUtil.storeMessageContext(rmMsgCtx
+                                               .getMessageContext());
+                               storageMapMgr.insert(new InvokerBean(key, 
msgNo, sequenceId));
+
+                               //This will avoid performing application 
processing more
+                               // than
+                               // once.
+                               rmMsgCtx.setProperty(
+                                               
Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
-                               } catch (Exception ex) {
-                                       throw new 
SandeshaException(ex.getMessage());
-                               }
+                       } catch (Exception ex) {
+                               throw new SandeshaException(ex.getMessage());
+                       }
 
-                               //Starting the invoker if stopped.
-                               
SandeshaUtil.startInvokerIfStopped(msgCtx.getConfigurationContext());
+                       //Starting the invoker if stopped.
+                       SandeshaUtil
+                                       
.startInvokerIfStopped(msgCtx.getConfigurationContext());
 
-                       }
                }
 
-//             try {
-//                     MessageContext requestMessage = 
rmMsgCtx.getMessageContext()
-//                                     
.getOperationContext().getMessageContext(
-//                                                     
WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-//                     String requestMessageId = requestMessage.getMessageID();
-//                     SequencePropertyBean checkResponseBean = 
seqPropMgr.retrieve(
-//                                     requestMessageId,
-//                                     
Sandesha2Constants.SequenceProperties.CHECK_RESPONSE);
-//                     if (checkResponseBean != null) {
-//                             checkResponseBean.setValue(msgCtx);
-//                             seqPropMgr.update(checkResponseBean);
-//                     }
-//
-//             } catch (AxisFault e) {
-//                     throw new SandeshaException(e.getMessage());
-//             }
-               
-               applicationMsgTransaction.commit();
+               invokeTransaction.commit();
+
+               //Sending acknowledgements
+               sendAckIfNeeded(rmMsgCtx, messagesStr);
+
        }
 
        //TODO convert following from INT to LONG
@@ -272,7 +275,7 @@
                SequencePropertyBean acksToBean = 
seqPropMgr.retrieve(sequenceId,
                                
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
 
-               EndpointReference acksTo = new EndpointReference 
(acksToBean.getValue());
+               EndpointReference acksTo = new 
EndpointReference(acksToBean.getValue());
                String acksToStr = acksTo.getAddress();
 
                if (acksToStr == null || messagesStr == null)
@@ -311,9 +314,10 @@
 
                MessageContext ackMsgCtx = 
SandeshaUtil.createNewRelatedMessageContext(
                                rmMsgCtx, ackOperation);
-               
-               
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-               
+
+               
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
+                               "true");
+
                RMMsgContext ackRMMsgCtx = 
MsgInitializer.initializeMessage(ackMsgCtx);
 
                ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
@@ -351,10 +355,11 @@
                        }
 
                        
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-                                       
org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+                                       
org.apache.axis2.Constants.RESPONSE_WRITTEN,
+                                       Constants.VALUE_TRUE);
 
-                       
rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN,
-                                       "true");
+                       rmMsgCtx.getMessageContext().setProperty(
+                                       Sandesha2Constants.ACK_WRITTEN, "true");
                        try {
                                engine.send(ackRMMsgCtx.getMessageContext());
                        } catch (AxisFault e1) {
@@ -362,6 +367,8 @@
                        }
                } else {
 
+                       Transaction asyncAckTransaction = 
storageManager.getTransaction();
+
                        SenderBeanMgr retransmitterBeanMgr = storageManager
                                        .getRetransmitterBeanMgr();
 
@@ -380,15 +387,15 @@
 
                        RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
                                        
.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-                       long ackInterval = 
PropertyManager.getInstance().getAcknowledgementInterval();
+                       long ackInterval = PropertyManager.getInstance()
+                                       .getAcknowledgementInterval();
                        if (policyBean != null) {
                                ackInterval = 
policyBean.getAcknowledgementInaterval();
                        }
-
+                       
                        //Ack will be sent as stand alone, only after the 
retransmitter
                        // interval.
                        long timeToSend = System.currentTimeMillis() + 
ackInterval;
-                       ackBean.setTimeToSend(timeToSend);
 
                        //removing old acks.
                        SenderBean findBean = new SenderBean();
@@ -398,14 +405,19 @@
                        findBean.setReSend(false);
                        Collection coll = retransmitterBeanMgr.find(findBean);
                        Iterator it = coll.iterator();
-                       while (it.hasNext()) {
-                               SenderBean retransmitterBean = (SenderBean) it
-                                               .next();
-                               
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+                       if (it.hasNext()) {
+                               SenderBean oldAckBean = (SenderBean) it.next();
+                               timeToSend = oldAckBean.getTimeToSend();        
        //If there is an old ack. This ack will be sent in the old timeToSend.
+                               
retransmitterBeanMgr.delete(oldAckBean.getMessageID());
                        }
+                       
+                       ackBean.setTimeToSend(timeToSend);
 
                        //inserting the new ack.
                        retransmitterBeanMgr.insert(ackBean);
+
+                       asyncAckTransaction.commit();
 
                        SandeshaUtil.startSenderIfStopped(configCtx);
                }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Tue Dec 27 05:06:25 2005
@@ -33,6 +33,7 @@
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.Accept;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -215,6 +216,8 @@
                        retransmitterMgr.update(tempBean);
                }
 
+               
SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+               
                updateAppMessagesTransaction.commit();
                
                
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Tue Dec 27 05:06:25 2005
@@ -28,6 +28,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
@@ -70,6 +71,8 @@
                TerminateManager.terminateReceivingSide(context,sequenceId);
                
                terminateTransaction.commit(); 
+               
+               SequenceManager.updateLastActivatedTime(sequenceId,context);
 
                //terminateSeqMsg.pause();
                terminateSeqRMMSg.getMessageContext().setPausedTrue(new QName 
(Sandesha2Constants.IN_HANDLER_NAME));

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java 
Tue Dec 27 05:06:25 2005
@@ -47,6 +47,8 @@
                if (context != null)
                        this.context = context;
        }
+       
+       public abstract void initStorage ();
 
        public abstract Transaction getTransaction();
 

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 Tue Dec 27 05:06:25 2005
@@ -79,6 +79,9 @@
                        if (bean.getSequenceID() != null
                                        && 
!bean.getSequenceID().equals(temp.getSequenceID()))
                                select = false;
+                       
+                       if (bean.isInvoked()!=temp.isInvoked())
+                               select = false;
 
                        if (select)
                                beans.add(temp);

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 Tue Dec 27 05:06:25 2005
@@ -16,17 +16,13 @@
  */
 package org.apache.sandesha2.storage.inmemory;
 
-import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 
 import org.apache.axis2.context.AbstractContext;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.RetransmitterBeanMgrTest;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Tue Dec 27 05:06:25 2005
@@ -74,4 +74,8 @@
 
                return instance;
        }
+       
+       public void  initStorage () {
+               
+       }
 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 Tue Dec 27 05:06:25 2005
@@ -82,7 +82,13 @@
                                        baseInterval);
                }
 
-               retransmitterBean.setTimeToSend(lastSentTime + newInterval);
+               long newTimeToSend = 0;
+               //newTimeToSend = lastSentTime + newInterval;
+               
+               long timeNow = System.currentTimeMillis();
+               newTimeToSend = timeNow + newInterval;
+               
+               retransmitterBean.setTimeToSend(newTimeToSend);
 
                return retransmitterBean;
        }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java 
Tue Dec 27 05:06:25 2005
@@ -9,12 +9,15 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
@@ -97,6 +100,8 @@
                // message to invoke
                //this will apply for only in-order invocations.
 
+               
updateLastActivatedTime(sequenceId,createSequenceMsg.getMessageContext().getConfigurationContext());
+               
                return sequenceId;
        }
 
@@ -140,4 +145,71 @@
                seqPropMgr.insert(acksToBean);
 
        }
+       
+       public static void updateLastActivatedTime (String sequenceID, 
ConfigurationContext configContext) throws SandeshaException {
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);
+               Transaction lastActivatedTransaction = 
storageManager.getTransaction();
+               SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
+               SequencePropertyBean lastActivatedBean = 
sequencePropertyBeanMgr.retrieve(sequenceID, 
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+               
+               boolean added = false;
+               
+               if (lastActivatedBean==null) {
+                       added = true;
+                       lastActivatedBean = new SequencePropertyBean ();
+                       lastActivatedBean.setSequenceID(sequenceID);
+                       
lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+               }
+               
+               long currentTime = System.currentTimeMillis();
+               lastActivatedBean.setValue(Long.toString(currentTime));
+               
+               if (added)
+                       sequencePropertyBeanMgr.insert(lastActivatedBean);
+               else
+                       sequencePropertyBeanMgr.update(lastActivatedBean);
+               
+               lastActivatedTransaction.commit();
+       }
+       
+       public static long getLastActivatedTime (String sequenceID, 
ConfigurationContext configContext) throws SandeshaException {
+               
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);
+               SequencePropertyBeanMgr seqPropBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
+               
+               SequencePropertyBean lastActivatedBean = 
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+               
+               long lastActivatedTime = -1;
+               
+               if (lastActivatedBean!=null) {
+                       lastActivatedTime = 
Long.parseLong(lastActivatedBean.getValue());
+               }
+               
+               return lastActivatedTime;
+       }
+               
+       public static boolean hasSequenceTimedOut (String sequenceID, 
RMMsgContext rmMsgCtx) throws SandeshaException {
+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+               SequencePropertyBeanMgr seqPropBeanMgr = 
storageManager.getSequencePropretyBeanMgr();
+               
+               RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+                       .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+               if (policyBean == null) {
+                       //loading default policies.
+                       policyBean = 
PropertyManager.getInstance().getRMPolicyBean();
+               }
+
+               boolean sequenceTimedOut = false;
+               
+               SequencePropertyBean lastActivatedBean = 
seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+               if (lastActivatedBean!=null) {
+                       long lastActivatedTime = 
Long.parseLong(lastActivatedBean.getValue());
+                       long timeNow = System.currentTimeMillis();
+                       if 
(lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow)
+                               sequenceTimedOut = true;
+               }
+               
+               return sequenceTimedOut;
+       }
+       
 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
Tue Dec 27 05:06:25 2005
@@ -182,6 +182,7 @@
                                                        Sequence sequence = 
(Sequence) rmMsg
                                                                        
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                                                        if 
(sequence.getLastMessage() != null) {
+                                                               
                                                                
TerminateManager.terminateAfterInvocation(
                                                                                
context, sequenceId);
                                                                

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Tue 
Dec 27 05:06:25 2005
@@ -18,6 +18,7 @@
 
 import java.util.Collection;
 import java.util.Iterator;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -25,9 +26,9 @@
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2ClientAPI;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.TerminateManager;
 import org.apache.sandesha2.storage.StorageManager;
@@ -37,11 +38,14 @@
 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
- * This is responsible for sending and re-sending messages of Sandesha2. This 
represent a thread that keep running all
- * the time. This keep looking at the Sender table to find out any entries 
that should be sent.
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
  * 
  * @author Chamikara Jayalath <[EMAIL PROTECTED]>
  */
@@ -63,29 +67,32 @@
        public void run() {
 
                StorageManager storageManager = null;
-               
+
                try {
-                       storageManager = SandeshaUtil
-                       .getSandeshaStorageManager(context);
+                       storageManager = 
SandeshaUtil.getSandeshaStorageManager(context);
                } catch (SandeshaException e2) {
                        // TODO Auto-generated catch block
                        System.out.println("ERROR: Could not start sender");
                        e2.printStackTrace();
                        return;
                }
-               
+
                while (senderStarted) {
                        try {
                                if (context == null)
                                        throw new SandeshaException(
                                                        "Can't continue the 
Sender. Context is null");
 
+                               Transaction pickMessagesToSendTransaction = 
storageManager.getTransaction(); //starting
+                                                                               
                                                                           // a
+                                                                               
                                                                           // 
new
+                                                                               
                                                                           // 
transaction
 
-                               Transaction sendTransaction = 
storageManager.getTransaction(); //starting a new transaction
-                               
-                               SenderBeanMgr mgr = storageManager
-                                               .getRetransmitterBeanMgr();
+                               SenderBeanMgr mgr = 
storageManager.getRetransmitterBeanMgr();
                                Collection coll = mgr.findMsgsToSend();
+
+                               pickMessagesToSendTransaction.commit();
+                               
                                Iterator iter = coll.iterator();
 
                                while (iter.hasNext()) {
@@ -96,9 +103,10 @@
                                                        
.getStoredMessageContext(key);
 
                                        try {
-                                               
-                                               if (msgCtx==null) {
-                                                       
System.out.println("ERROR: Sender has an Unavailable Message entry");
+
+                                               if (msgCtx == null) {
+                                                       System.out
+                                                                       
.println("ERROR: Sender has an Unavailable Message entry");
                                                        break;
                                                }
                                                RMMsgContext rmMsgCtx = 
MsgInitializer
@@ -121,57 +129,100 @@
                                                                                
                + "' message.");
                                                        }
                                                }
+                                               
+                                               Transaction preSendTransaction 
= storageManager.getTransaction();
 
-                                               if (rmMsgCtx.getMessageType() 
== Sandesha2Constants.MessageTypes.APPLICATION) {
+                                               int messageType = 
rmMsgCtx.getMessageType();
+                                               
+                                               if (messageType == 
Sandesha2Constants.MessageTypes.APPLICATION) {
+                                                       
+                                                       Sequence sequence = 
(Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                                                       String sequenceID = 
sequence.getIdentifier().getIdentifier();
+                                                       //checking weather the 
sequence has been timed out.
+                                                       boolean 
sequenceTimedOut = SequenceManager.hasSequenceTimedOut (sequenceID, rmMsgCtx);;
+                                                       if (sequenceTimedOut) {
+                                                               //sequence has 
been timed out.
+                                                               //do time out 
processing.
+                                                               
+                                                               
TerminateManager.terminateSendingSide(context,sequenceID);
+                                                               throw new 
SandeshaException ("Sequence timed out");
+                                                       }
+                                                       
                                                        //piggybacking if an 
ack if available for the same
                                                        // sequence.
                                                        AcknowledgementManager
                                                                        
.piggybackAckIfPresent(rmMsgCtx);
+                                                       
                                                }
+                                               
+                                               preSendTransaction.commit();
 
                                                try {
-                                                       AxisEngine engine = new 
AxisEngine (msgCtx.getConfigurationContext());
-                                                       engine.send(msgCtx);
-//                                                     if (msgCtx.isPaused())
-//                                                             
engine.resumeSend(msgCtx);
-//                                                     else
-//                                                             
engine.send(msgCtx);
                                                        
+                                                       AxisEngine engine = new 
AxisEngine(msgCtx
+                                                                       
.getConfigurationContext());
+                                                       engine.send(msgCtx);
+                                                       //                      
                                if (msgCtx.isPaused())
+                                                       //                      
                                        engine.resumeSend(msgCtx);
+                                                       //                      
                                else
+                                                       //                      
                                        engine.send(msgCtx);
+
                                                } catch (Exception e) {
                                                        //Exception is sending. 
retry later
                                                        System.out
                                                                        
.println("Exception thrown in sending...");
                                                        e.printStackTrace();
+                                                       //e.printStackTrace();
+
                                                }
+                                               
+                                               Transaction postSendTransaction 
= storageManager.getTransaction();
 
                                                MessageRetransmissionAdjuster 
retransmitterAdjuster = new MessageRetransmissionAdjuster();
+
+                                               if (rmMsgCtx.getMessageType() 
== Sandesha2Constants.MessageTypes.APPLICATION) {
+                                                       Sequence sequence = 
(Sequence) rmMsgCtx
+                                                                       
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                                                       long messageNo = 
sequence.getMessageNumber()
+                                                                       
.getMessageNumber();
+                                               }
+
                                                
retransmitterAdjuster.adjustRetransmittion(bean);
 
-//                                             mgr.update(bean);
-                                               
-                                               if (bean.isReSend())
-                                                       mgr.update(bean);
-                                               else
-                                                       
mgr.delete(bean.getMessageID());
-                                               
-                                               sendTransaction.commit();       
        //commiting the current transaction
+                                               //update or delete only if the 
object is still present.
+                                               SenderBean bean1 = 
mgr.retrieve(bean.getMessageID());
+                                               if (bean1 != null) {
+                                                       if (bean.isReSend())
+                                                               
mgr.update(bean);
+                                                       else
+                                                               
mgr.delete(bean.getMessageID());
+                                               }
+
+                                               postSendTransaction.commit(); 
//commiting the current
+                                                                               
                  // transaction
 
-                                               Transaction 
processResponseTransaction = storageManager.getTransaction();
+                                               Transaction 
processResponseTransaction =
+                                               storageManager.getTransaction();
                                                if (!msgCtx.isServerSide())
                                                        
checkForSyncResponses(msgCtx);
-                                               
+                                                                               
                
                                                
processResponseTransaction.commit();
-                                               
-                                               Transaction 
terminateCleaningTransaction = storageManager.getTransaction();
-                                               if 
(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+
+                                               Transaction 
terminateCleaningTransaction = storageManager
+                                                               
.getTransaction();
+                                               if (rmMsgCtx.getMessageType() 
== Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
                                                        //terminate sending 
side.
-                                                       TerminateSequence 
terminateSequence = (TerminateSequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-                                                       String sequenceID = 
terminateSequence.getIdentifier().getIdentifier();
-                                                       ConfigurationContext 
configContext = msgCtx.getConfigurationContext();
-                                                       
-                                                       
TerminateManager.terminateSendingSide(configContext,sequenceID);
+                                                       TerminateSequence 
terminateSequence = (TerminateSequence) rmMsgCtx
+                                                                       
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+                                                       String sequenceID = 
terminateSequence
+                                                                       
.getIdentifier().getIdentifier();
+                                                       ConfigurationContext 
configContext = msgCtx
+                                                                       
.getConfigurationContext();
+
+                                                       
TerminateManager.terminateSendingSide(
+                                                                       
configContext, sequenceID);
                                                }
-                                               
+
                                                
terminateCleaningTransaction.commit();
 
                                        } catch (AxisFault e1) {
@@ -179,30 +230,15 @@
                                        } catch (Exception e3) {
                                                e3.printStackTrace();
                                        }
-
-                                       //changing the values of the sent bean.
-                                       
//bean.setLastSentTime(System.currentTimeMillis());
-                                       //bean.setSentCount(bean.getSentCount() 
+ 1);
-
-                                       //update if resend=true otherwise 
delete. (reSend=false
-                                       // means
-                                       // send only once).
-//                                     if (bean.isReSend())
-//                                             mgr.update(bean);
-//                                     else
-//                                             mgr.delete(bean.getMessageID());
-
                                }
-                               
-                               
-                               
+
                        } catch (SandeshaException e) {
                                e.printStackTrace();
                                return;
                        }
 
                        try {
-                               Thread.sleep(2000);
+                               
Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
                        } catch (InterruptedException e1) {
                                //e1.printStackTrace();
                                System.out.println("Sender was interupted...");
@@ -248,58 +284,61 @@
 
        }
 
-       private void checkForSyncResponses(MessageContext msgCtx)  {
+       private void checkForSyncResponses(MessageContext msgCtx) {
 
                try {
-               boolean responsePresent = (msgCtx
-                               .getProperty(MessageContext.TRANSPORT_IN) != 
null);
+                       boolean responsePresent = (msgCtx
+                                       
.getProperty(MessageContext.TRANSPORT_IN) != null);
 
-               if (responsePresent) {
-                       //create the response
-                       MessageContext response = new MessageContext(msgCtx
-                                       .getConfigurationContext(), 
msgCtx.getSessionContext(), msgCtx
-                                       .getTransportIn(), 
msgCtx.getTransportOut());
-                       response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
-                                       
.getProperty(MessageContext.TRANSPORT_IN));
-
-                       response.setServerSide(false);
-
-                       //If request is REST we assume the response is REST, so 
set the
-                       // variable
-                       response.setDoingREST(msgCtx.isDoingREST());
-                       response
-                                       
.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
-                       
response.setServiceGroupContext(msgCtx.getServiceGroupContext());
-                       response.setServiceContext(msgCtx.getServiceContext());
-                       response.setAxisService(msgCtx.getAxisService());
-                       
response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
-
-                       //setting the in-flow.
-                       //ArrayList inPhaseHandlers =
-                       // 
response.getAxisOperation().getRemainingPhasesInFlow();
-                       /*
-                        * if (inPhaseHandlers==null || 
inPhaseHandlers.isEmpty()) {
-                        * ArrayList phases =
-                        * 
msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
-                        * 
response.getAxisOperation().setRemainingPhasesInFlow(phases); }
-                        */
-
-                       //Changed following from TransportUtils to SandeshaUtil 
since op.
-                       // context is anavailable.
-                       SOAPEnvelope resenvelope = null;
-                       resenvelope = SandeshaUtil.createSOAPMessage(response, 
msgCtx
-                                       
.getEnvelope().getNamespace().getName());
-
-
-                       if (resenvelope != null) {
-                               AxisEngine engine = new 
AxisEngine(msgCtx.getConfigurationContext());
-                               response.setEnvelope(resenvelope);
-                               engine.receive(response);
+                       if (responsePresent) {
+                               //create the response
+                               MessageContext response = new 
MessageContext(msgCtx
+                                               .getConfigurationContext(), 
msgCtx.getSessionContext(),
+                                               msgCtx.getTransportIn(), 
msgCtx.getTransportOut());
+                               
response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+                                               
.getProperty(MessageContext.TRANSPORT_IN));
+
+                               response.setServerSide(false);
+
+                               //If request is REST we assume the response is 
REST, so set the
+                               // variable
+                               response.setDoingREST(msgCtx.isDoingREST());
+                               response.setServiceGroupContextId(msgCtx
+                                               .getServiceGroupContextId());
+                               response
+                                               
.setServiceGroupContext(msgCtx.getServiceGroupContext());
+                               
response.setServiceContext(msgCtx.getServiceContext());
+                               
response.setAxisService(msgCtx.getAxisService());
+                               
response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
+
+                               //setting the in-flow.
+                               //ArrayList inPhaseHandlers =
+                               // 
response.getAxisOperation().getRemainingPhasesInFlow();
+                               /*
+                                * if (inPhaseHandlers==null || 
inPhaseHandlers.isEmpty()) {
+                                * ArrayList phases =
+                                * 
msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
+                                * 
response.getAxisOperation().setRemainingPhasesInFlow(phases); }
+                                */
+
+                               //Changed following from TransportUtils to 
SandeshaUtil since
+                               // op.
+                               // context is anavailable.
+                               SOAPEnvelope resenvelope = null;
+                               resenvelope = 
SandeshaUtil.createSOAPMessage(response, msgCtx
+                                               
.getEnvelope().getNamespace().getName());
+
+                               if (resenvelope != null) {
+                                       AxisEngine engine = new 
AxisEngine(msgCtx
+                                                       
.getConfigurationContext());
+                                       response.setEnvelope(resenvelope);
+                                       engine.receive(response);
+                               }
                        }
-               }
-               
-               }catch (Exception e) {
-                       System.out.println("Exception was throws in processing 
the sync response...");
+
+               } catch (Exception e) {
+                       System.out
+                                       .println("Exception was throws in 
processing the sync response...");
                }
        }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to