Author: parsonsd
Date: Fri Apr  3 18:27:57 2009
New Revision: 761754

URL: http://svn.apache.org/viewvc?rev=761754&view=rev
Log:
Fix to allow automatic reallocation of sequences that have timed out or been 
deleted.  The solution is to have a reallocated RMSBean point at the RMSBean 
created as part of the reallocation via a new RMSBean attribute that contains 
the internalSeqID of the newly created RMSBean.

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
 Fri Apr  3 18:27:57 2009
@@ -245,6 +245,23 @@
                String ENDPOINT = "Endpoint";
                
                String UNSUPPORTED_ELEMENT = "UnsupportedElement";
+               
+               //This is to identify an RMSBean that hasn't been reallocated
+               int NOT_REALLOCATED = 0;
+               
+               //This is to identify an RMSBean that is to be reallocated or 
has been reallocated
+               int REALLOCATED = 1;
+               
+               //This is to identify an RMSBean that was created for 
reallocation but then was reallocated itself
+               //That way we know it can be deleted
+               int ORIGINAL_REALLOCATED_BEAN_COMPLETE = 2;
+               
+               //This is to identify the RMS Bean that was created to 
reallocate another RMSBean 
+               int RMS_BEAN_USED_FOR_REALLOCATION = 3;
+               
+               //This is to identify an RMSBean that was attempted to be 
reallocated but for some reason the reallocation failed.
+               int REALLOCATION_FAILED = -1;
+               
        }
 
        public interface WSA {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 Fri Apr  3 18:27:57 2009
@@ -443,8 +443,19 @@
                        if (terminatedSequence) {               
                                // Delete the rmsBean
                                
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+                               
+                               if(tran != null && tran.isActive()) 
tran.commit();
+                               tran = storageManager.getTransaction();         
                
+                               
+                               //Need to check if it's an RMSBean created for 
reallocation.  If so we need to                          
+                               //delete the original RMSBean that was 
reallocated.                             
+                               RMSBean reallocatedRMSBean = 
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, 
rmsBean.getInternalSequenceID());
+                               if(reallocatedRMSBean != null){                 
                
+                                       if (log.isDebugEnabled())
+                                               log.debug("Removing Reallocated 
RMSBean " + reallocatedRMSBean);
+                                       
storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+                               }
                        }
-                       
                        if(tran != null && tran.isActive()) tran.commit();
                        tran = null;
                

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Fri Apr  3 18:27:57 2009
@@ -79,8 +79,8 @@
        public static final String propertyInvalidValue="propertyInvalidValue";
        public static final String invalidRange="invalidRange";
        public static final String workAlreadyAssigned="workAlreadyAssigned";
-       public static final String reallocationFailed="reallocationFailed"; 
-
+       public static final String reallocationFailed="reallocationFailed";
+       public static final String 
reallocationForSyncRequestReplyNotSupported="reallocationForSyncRequestReplyNotSupported";
 
        public static final String 
rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
        public static final String unknownWSAVersion="unknownWSAVersion";

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Fri Apr  3 18:27:57 2009
@@ -177,8 +177,6 @@
                if (msgContext.getMessageID() == null)
                        msgContext.setMessageID(SandeshaUtil.getUUID());
 
-               
-
                /*
                 * Internal sequence id is the one used to refer to the 
sequence (since
                 * actual sequence id is not available when first msg arrives) 
server
@@ -230,13 +228,47 @@
 
                RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
 
+               boolean autoStartNewSeqForReallocation = false;
                //if this is an existing sequence then we need to do some 
checks first
                if(rmsBean != null)
                {
+                       //If the sequence has been reallocated we need to find 
out the new internalSeqID.
+                       //If the internalSeqID hasn't been set yet we should 
auto restart.  If it has a new
+                       //internalSeqID we just send the message on the new 
reallocated sequence. 
+                       int seqReallocated = rmsBean.isReallocated();
+                       if(seqReallocated == 
Sandesha2Constants.WSRM_COMMON.REALLOCATED){
+                               if (log.isDebugEnabled())
+                                       log.debug("ApplicationMsgProcessor: 
Reallocated Sequence: " + rmsBean.getSequenceID());
+                               //Try and get the new internalSeqID
+                               internalSequenceId = 
rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+                               if(internalSequenceId != null){
+                                       if (log.isDebugEnabled())
+                                               
log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " + 
internalSequenceId);
+                                       
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceId);
+                                       rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
+                               } else {
+                                       autoStartNewSeqForReallocation = true;
+                               }
+                       } else if(seqReallocated == 
Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){
+                               //We can't do anymore as we have already tried 
to reallocate this sequence.
+                               throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed,
 rmsBean.getSequenceID(),
+                                               "We have already attempted to 
reallocate this Sequence and we won't try again.  The sequance needs to be 
cleaned up manually."));
+                       }
+
                        //see if the sequence is closed
-                       if(rmsBean.isSequenceClosedClient() || 
rmsBean.isTerminateAdded() || rmsBean.isTimedOut()){
+                       if(rmsBean.isSequenceClosedClient() || 
rmsBean.isTerminateAdded() || rmsBean.isTimedOut() || 
autoStartNewSeqForReallocation){
                                
if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
                                        internalSequenceId = 
getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
+                                       if(autoStartNewSeqForReallocation){
+                                               if (log.isDebugEnabled())
+                                                       
log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation: 
InternalSeqID of new sequence used for reallocation: " 
+                                                                               
+ internalSequenceId);
+                                               
rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
+                                               
storageManager.getRMSBeanMgr().update(rmsBean);
+                                               
+                                               if(tran != null && 
tran.isActive()) tran.commit();
+                                               tran = 
storageManager.getTransaction();         
+                                       }
                                        if (log.isDebugEnabled())
                                                
log.debug("ApplicationMsgProcessor: auto start new sequence " + 
internalSequenceId + " :: " + rmsBean);
                                        //set this new internal sequence ID on 
the msg
@@ -337,6 +369,11 @@
                                        if (rmsBean == null) {
                                                rmsBean = 
SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, 
storageManager);
                                                rmsBean = 
addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+
+                                               
if(autoStartNewSeqForReallocation){
+                                                       
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION);
+                                               }
+
                                                if(rmsBean != null) 
outSequenceID = rmsBean.getSequenceID();
                                                
                                                if (rmsBean == null && 
appMsgProcTran != null && appMsgProcTran.isActive()) {
@@ -348,7 +385,6 @@
                                                        appMsgProcTran = 
storageManager.getTransaction();
                                                }
                                        }
-
                                }
        
                        } else {
@@ -554,6 +590,7 @@
                
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+
                return true;
        }
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Fri Apr  3 18:27:57 2009
@@ -154,7 +154,7 @@
                if(!rmsBeanMgr.update(rmsBean)){                        
                        //Im not setting the createSeqBean sender bean to 
resend true as the reallocation of msgs will do this
                        try{
-                               TerminateManager.terminateSendingSide(rmsBean, 
storageManager, true);
+                               TerminateManager.terminateSendingSide(rmsBean, 
storageManager, true, transaction);
                        } catch(Exception e){
                                if (log.isDebugEnabled())
                                        log.debug(e);                           
        

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 Fri Apr  3 18:27:57 2009
@@ -74,7 +74,7 @@
                        }
                }
 
-               TerminateManager.terminateSendingSide (rmsBean, storageManager, 
false);
+               TerminateManager.terminateSendingSide (rmsBean, storageManager, 
false, null);
                
                // Stop this message travelling further through the Axis runtime
                terminateResRMMsg.pause();

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java
 Fri Apr  3 18:27:57 2009
@@ -19,6 +19,7 @@
 
 package org.apache.sandesha2.storage.beans;
 
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.util.Range;
 import org.apache.sandesha2.util.RangeString;
 
@@ -152,6 +153,22 @@
         * be ignored within the match method.
         */
        private int rmsFlags = 0;
+       
+       /**
+        * Indicates the reallocation state.  The states can be either:
+        * notReallocated - The bean hasn't been reallocated
+        * reallocated - The bean is to be reallocated
+        * ReallocatedBeanComplete - The bean was created for reallocation but 
is no longer needed as itself has been reallocated
+        * BeanUsedForReallocation - The bean was created for reallocation
+        * ReallocationFailed - The reallocation of this bean failed
+        */
+       private int reallocated = 
Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED;
+       
+       /**
+        * Contains the internalSeqID of the seq that has sent the reallocated 
msgs
+        */
+       private String internalSeqIDOfSeqUsedForReallocation = null;
+       
        public static final int LAST_SEND_ERROR_TIME_FLAG = 0x00000001;
        public static final int LAST_OUT_MSG_FLAG         = 0x00000010;
        public static final int HIGHEST_OUT_MSG_FLAG      = 0x00000100;
@@ -195,7 +212,9 @@
                 terminationPauserForCS = beanToCopy.isTerminationPauserForCS();
                 timedOut = beanToCopy.isTimedOut();
                 transportTo = beanToCopy.getTransportTo();
-                avoidAutoTermination = beanToCopy.isAvoidAutoTermination();    
        
+                avoidAutoTermination = beanToCopy.isAvoidAutoTermination();    
+                reallocated = beanToCopy.isReallocated();
+                internalSeqIDOfSeqUsedForReallocation = 
beanToCopy.getInternalSeqIDOfSeqUsedForReallocation();
        }
 
        public String getCreateSeqMsgID() {
@@ -434,6 +453,8 @@
                result.append("\nClientCompletedMsgs: "); 
result.append(clientCompletedMessages);
                result.append("\nAnonymous UUID     : "); 
result.append(anonymousUUID);
                result.append("\nSOAPVersion  : "); result.append(soapVersion);
+               result.append("\nReallocated  : "); result.append(reallocated);
+               result.append("\nInternalSeqIDOfSeqUsedForReallocation  : "); 
result.append(internalSeqIDOfSeqUsedForReallocation);
                return result.toString();
        }
        
@@ -478,6 +499,9 @@
                else if(bean.getAnonymousUUID() != null && 
!bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
                        match = false;
                
+               else if((bean.getInternalSeqIDOfSeqUsedForReallocation() != 
null && 
!bean.getInternalSeqIDOfSeqUsedForReallocation().equals(this.getInternalSeqIDOfSeqUsedForReallocation())))
+                       match = false;
+               
 // Avoid matching on the error information
 //             else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && 
bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
 //                     match = false;
@@ -511,8 +535,26 @@
 
                else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && 
bean.getExpectedReplies() != this.getExpectedReplies())
                        match = false;
+               
+
 
                return match;
        }
 
+       public int isReallocated() {
+               return reallocated;
+       }
+
+       public void setReallocated(int reallocated) {
+               this.reallocated = reallocated;
+       }
+
+       public String getInternalSeqIDOfSeqUsedForReallocation() {
+               return internalSeqIDOfSeqUsedForReallocation;
+       }
+
+       public void setInternalSeqIDOfSeqUsedForReallocation(String 
internalSeqIDOfSeqUsedForReallocation) {
+               this.internalSeqIDOfSeqUsedForReallocation = 
internalSeqIDOfSeqUsedForReallocation;
+       }
+
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
 Fri Apr  3 18:27:57 2009
@@ -619,7 +619,7 @@
                                        if (log.isDebugEnabled())
                                                log.debug("Sending fault 
message " + faultMessageContext.getEnvelope().getHeader());
        
-                                       // Sending the message
+                                       //Sending the message
                                        //having a surrounded try block will 
make sure that the error is logged here 
                                        //and that this does not disturb the 
processing of a carrier message.
                                        try {
@@ -671,7 +671,7 @@
                
        }
        
-       private static InvocationResponse manageIncomingFault (AxisFault fault, 
RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault {
+       private static InvocationResponse manageIncomingFault (AxisFault fault, 
RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws 
AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: FaultManager::manageIncomingFault");
                InvocationResponse response = InvocationResponse.CONTINUE;
@@ -743,7 +743,7 @@
                } else if 
(Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode)
 ||
                                
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode)
 || 
                                
Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode))
 {
-                       processSequenceUnknownFault(rmMsgCtx, fault, 
identifier);
+                       processSequenceUnknownFault(rmMsgCtx, fault, 
identifier, transaction);
                } 
                
                // If the operation is an Sandesha In Only operation, or the 
fault is a recognised fault,
@@ -783,7 +783,7 @@
 
                // constructing the fault
                AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, 
rmMsgCtx);
-               response = manageIncomingFault (axisFault, rmMsgCtx, faultPart);
+               response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, 
transaction);
                
                if(transaction != null && transaction.isActive()) 
transaction.commit();
                transaction = null;
@@ -966,7 +966,7 @@
                // Cleanup sending side.
                if (log.isDebugEnabled())
                        log.debug("Terminating sending sequence " + rmsBean);
-               TerminateManager.terminateSendingSide(rmsBean, storageManager, 
false);
+               TerminateManager.terminateSendingSide(rmsBean, storageManager, 
false, null);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
FaultManager::processCreateSequenceRefusedFault");
@@ -980,7 +980,7 @@
         * @param fault
         * @param identifier 
         */
-       private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, 
AxisFault fault, String sequenceID) throws AxisFault {
+       private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, 
AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
FaultManager::processSequenceUnknownFault " + sequenceID);
 
@@ -998,16 +998,16 @@
                        // Cleanup sending side.
                        if (log.isDebugEnabled())
                                log.debug("Terminating sending sequence " + 
rmsBean);
-                       if(!TerminateManager.terminateSendingSide(rmsBean, 
storageManager, true)){
+                       if(!TerminateManager.terminateSendingSide(rmsBean, 
storageManager, true, transaction)){
                                // We did not reallocate so we notify the 
clients of a failure
                                
notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, 
configCtx, fault);
+                               
+                               //Mark the RMSBean as reallocation failed and 
update last activation time
+                               transaction = storageManager.getTransaction();
+                               
rmsBean.setLastActivatedTime(System.currentTimeMillis());
+                               storageManager.getRMSBeanMgr().update(rmsBean);
+                               if(transaction != null && 
transaction.isActive()) transaction.commit();
                        }
-                       
-                       // Update the last activated time.
-                       
rmsBean.setLastActivatedTime(System.currentTimeMillis());
-                       
-                       // Update the bean in the map
-                       storageManager.getRMSBeanMgr().update(rmsBean);
                }
                else {
                        RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Fri Apr  3 18:27:57 2009
@@ -44,6 +44,7 @@
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.client.Options;
 import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.client.async.Callback;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -59,6 +60,8 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.engine.Handler;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.util.CallbackReceiver;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,6 +77,7 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -1015,10 +1019,22 @@
                            
                return targetEnv;
        }
-       
-       public static void reallocateMessagesToNewSequence(StorageManager 
storageManager, RMSBean oldRMSBean, List<MessageContext> msgsToSend)throws 
AxisFault{
-           if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-               log.debug("Enter: 
SandeshaUtil::reallocateMessagesToNewSequence");
+
+
+       /** 
+       * ReallocateMessages to a new sequence
+       * @param storageManager
+       * @param oldRMSBean
+       * @param msgsToSend
+       * @param transaction
+       * 
+       */
+       public static void reallocateMessagesToNewSequence(StorageManager 
storageManager, RMSBean oldRMSBean, 
+               List<MessageContext> msgsToSend, Transaction transaction)
+               throws AxisFault, SandeshaException{
+
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: 
SandeshaUtil::reallocateMessagesToNewSequence");
            
                ConfigurationContext ctx = storageManager.getContext();
                ServiceClient client = new ServiceClient(ctx,  null);
@@ -1027,30 +1043,68 @@
                Options options = client.getOptions();
                options.setTo(oldRMSBean.getToEndpointReference());
                options.setReplyTo(oldRMSBean.getReplyToEndpointReference());
-               
-        //internal sequence ID is different
-        String internalSequenceID = oldRMSBean.getInternalSequenceID();
-        //we also need to obtain the sequenceKey from the internalSequenceID.
-        String oldSequenceKey = 
-          
SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, 
oldRMSBean.getToEndpointReference().getAddress());
-        //remove the old sequence key from the internal sequence ID
-        internalSequenceID = internalSequenceID.substring(0, 
internalSequenceID.length()-oldSequenceKey.length());
-        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, 
-                       SandeshaUtil.getUUID()); //using a new sequence Key to 
differentiate from the old sequence 
-        
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
-        options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, 
oldRMSBean.getRMVersion());
-       
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, 
Boolean.FALSE);
-       
-        //send the msgs - this will setup a new sequence to the same endpoint
-       Iterator<MessageContext> it = msgsToSend.iterator();
-       while(it.hasNext()){
-               MessageContext msgCtx = (MessageContext)it.next();
-               client.getOptions().setAction(msgCtx.getWSAAction());
-               
client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
-       }
-       
-           if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
-               log.debug("Exit: 
SandeshaUtil::reallocateMessagesToNewSequence");
+
+               //internal sequence ID is different
+               String internalSequenceID = oldRMSBean.getInternalSequenceID();
+
+               
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
+               options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, 
oldRMSBean.getRMVersion());
+               
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, 
Boolean.FALSE);
+
+               //Update the RMSBean so as to mark it as reallocated if it 
isn't an RMSbean created for a previous reallocation
+               RMSBean originallyReallocatedRMSBean = 
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, 
oldRMSBean.getInternalSequenceID());
+               if(originallyReallocatedRMSBean == null){
+                       
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATED);
+                       storageManager.getRMSBeanMgr().update(oldRMSBean);
+               } else {
+                       
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 originallyReallocatedRMSBean.getInternalSequenceID());
+                       
originallyReallocatedRMSBean.setInternalSeqIDOfSeqUsedForReallocation(null);    
+                       
storageManager.getRMSBeanMgr().update(originallyReallocatedRMSBean);
+
+                       //Setting this property so that the bean can be deleted
+                       
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE);
+                       
oldRMSBean.setInternalSeqIDOfSeqUsedForReallocation(originallyReallocatedRMSBean.getInternalSequenceID());
+                       storageManager.getRMSBeanMgr().update(oldRMSBean);
+               }
+
+               //Commit current transaction that wraps the manageFaultMsg as 
we are about to start
+               //resending msgs on a new seq and they will need to get a 
transaction on the 
+               //current thread
+               if(transaction != null && transaction.isActive()) 
transaction.commit();
+
+               //send the msgs - this will setup a new sequence to the same 
endpoint
+               Iterator<MessageContext> it = msgsToSend.iterator();
+
+               while(it.hasNext()){
+                       MessageContext msgCtx = (MessageContext)it.next();
+
+                       //Set the action
+                       client.getOptions().setAction(msgCtx.getWSAAction());
+
+                       //Set the message ID
+                       client.getOptions().setMessageId(msgCtx.getMessageID());
+
+                       //Get the AxisOperation
+                       AxisOperation axisOperation = msgCtx.getAxisOperation();
+
+                       //If it's oneway or async, reallocate
+                       //Fail if replyTo is annonymous as this is currently 
not supported because in twoway we can't get responses back to th eold something
+                       if(axisOperation.getAxisSpecificMEPConstant() == 
WSDLConstants.MEP_CONSTANT_OUT_ONLY){
+                               
client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
+                       } else if 
(client.getOptions().getReplyTo().hasAnonymousAddress()){
+                               
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+                               
storageManager.getRMSBeanMgr().update(oldRMSBean);
+                               throw new 
SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported);
+                       } else {
+                               MessageReceiver msgReceiver = 
axisOperation.getMessageReceiver();
+                               Object callback = 
((CallbackReceiver)msgReceiver).lookupCallback(msgCtx.getMessageID());        
+                               client.setAxisService(msgCtx.getAxisService());
+                               
client.sendReceiveNonBlocking(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement(),
 (Callback)callback);
+                       }
+               }
+
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: 
SandeshaUtil::reallocateMessagesToNewSequence");
        }
 
   /**
@@ -1276,4 +1330,16 @@
                return result;
        }
 
+       public static RMSBean isLinkedToReallocatedRMSBean(StorageManager 
storageManager, String internalSeqID) throws SandeshaException {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: 
SandeshaUtil::isLinkedToReallocatedRMSBean");
+ 
+               //Need to check if it's an RMSBean created for reallocation.
+               RMSBean finderBean = new RMSBean();
+               
finderBean.setInternalSeqIDOfSeqUsedForReallocation(internalSeqID);
+               RMSBean reallocatedRMSBean = 
storageManager.getRMSBeanMgr().findUnique(finderBean);
+       
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: 
SandeshaUtil::isLinkedToReallocatedRMSBean, ReallocatedRMSBean: " + 
reallocatedRMSBean);
+               return reallocatedRMSBean;
+       }
+
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 Fri Apr  3 18:27:57 2009
@@ -25,8 +25,11 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFault;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -40,12 +43,13 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
@@ -231,14 +235,14 @@
         * @return true if the reallocation happened sucessfully
         */
        public static boolean terminateSendingSide(RMSBean rmsBean, 
-                       StorageManager storageManager, boolean reallocate) 
throws SandeshaException {
+                       StorageManager storageManager, boolean reallocate, 
Transaction transaction) throws SandeshaException {
 
                // Indicate that the sequence is terminated
                rmsBean.setTerminated(true);
                rmsBean.setTerminateAdded(true);
                storageManager.getRMSBeanMgr().update(rmsBean);
                
-               return cleanSendingSideData (rmsBean.getInternalSequenceID(), 
storageManager, rmsBean, reallocate);
+               return cleanSendingSideData (rmsBean.getInternalSequenceID(), 
storageManager, rmsBean, reallocate, transaction);
        }
 
        public static void timeOutSendingSideSequence(String internalSequenceId,
@@ -249,11 +253,11 @@
                rmsBean.setLastActivatedTime(System.currentTimeMillis());
                storageManager.getRMSBeanMgr().update(rmsBean);
 
-               cleanSendingSideData(internalSequenceId, storageManager, 
rmsBean, false);
+               cleanSendingSideData(internalSequenceId, storageManager, 
rmsBean, false, null);
        }
 
        private static boolean cleanSendingSideData(String internalSequenceId, 
StorageManager storageManager, 
-                       RMSBean rmsBean, boolean reallocateIfPossible) throws 
SandeshaException {
+                       RMSBean rmsBean, boolean reallocateIfPossible, 
Transaction transaction) throws SandeshaException {
 
                if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
                        log.debug("Enter: 
TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + 
reallocateIfPossible);
@@ -274,12 +278,15 @@
                if(ranges.length==1){
                        //the sequence is a single contiguous acked range
                        lastAckedMsg = ranges[0].upperValue;
-               }
-               else{
-                       //cannot reallocate as there are gaps
-                       reallocateIfPossible=false;
-                       if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
-                               log.debug("cannot reallocate sequence as there 
are gaps");
+               } else{
+                       if(reallocateIfPossible){
+                               //cannot reallocate as there are gaps
+                               
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+                               storageManager.getRMSBeanMgr().update(rmsBean);
+                               reallocateIfPossible=false;
+                               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                                       log.debug("cannot reallocate sequence 
as there are gaps");
+                       }
                }
                
                while (iterator.hasNext()) {
@@ -332,14 +339,48 @@
                
                if(reallocateIfPossible){
                        try{
-                             
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, 
msgsToReallocate);  
-                             reallocatedOK = true;
-                       }
-                       catch(Exception e){
-                               //want that the reallocation failed
+                               
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, 
msgsToReallocate, transaction);   
+                               reallocatedOK = true;
+                       
+                               //If the reallocation was successful and the 
RMSBean being reallocated was originally created for reallocation
+                               //the RMSBean can be deleted.
+                               transaction = storageManager.getTransaction();
+                               if(rmsBean.isReallocated() == 
Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
+                                       
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
+                                       
storageManager.getRMSBeanMgr().update(rmsBean);
+                               }
+                               
+                               if(transaction != null && 
transaction.isActive()) transaction.commit();
+                               transaction = null;
+                       } catch(Exception e){
+                               
                                if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                                        
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed,
 rmsBean.getSequenceID(), e.toString()));                              
-                       }                       
+                       
+                               //Reallocation Failed
+                               //Need to mark any RMSBeans involved as failed 
so that we don't attempt to send
+                               //anymore messages on these seq's.  The client 
will have to manually reallocate and
+                               //administer the sequences.
+                               transaction = storageManager.getTransaction();
+                               
+                               
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+                               storageManager.getRMSBeanMgr().update(rmsBean);
+                               
+                               String intSeqIDOfOriginallyReallocatedSeq = 
rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
+                               if(intSeqIDOfOriginallyReallocatedSeq != null){
+                                       RMSBean origRMSBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
intSeqIDOfOriginallyReallocatedSeq);
+                                       
origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
+                                       
storageManager.getRMSBeanMgr().update(origRMSBean);
+                               }
+                               
+                                if(transaction != null && 
transaction.isActive()) transaction.commit();
+                                       transaction = null;
+                               
+                       } finally {
+                               if (transaction != null && 
transaction.isActive()) {
+                                       transaction.rollback();
+                               }
+                       }               
                }
                
                if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Fri Apr  3 18:27:57 2009
@@ -427,7 +427,7 @@
 
        private void deleteRMSBeans(List<RMSBean> rmsBeans, SandeshaPolicyBean 
propertyBean, long deleteTime)
 
-       throws SandeshaStorageException {
+       throws SandeshaStorageException, SandeshaException {
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::deleteRMSBeans");
 
@@ -437,12 +437,24 @@
                        RMSBean rmsBean = (RMSBean) beans.next();
                        long timeNow = System.currentTimeMillis();
                        long lastActivated = rmsBean.getLastActivatedTime();
+
                        // delete sequences that have been timedout or deleted 
for more than
                        // the SequenceRemovalTimeoutInterval
-
-                       if ((lastActivated + deleteTime) < timeNow) {
+                       if (((lastActivated + deleteTime) < timeNow) &&
+                               (rmsBean.isReallocated() == 
Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED)) {
                                if (log.isDebugEnabled())
                                        log.debug("Removing RMSBean " + 
rmsBean);
+
+                               //Need to check if it's an RMSBean created for 
reallocation.  If so we need to
+                               //delete the original RMSBean that was 
reallocated.
+                               RMSBean reallocatedRMSBean = 
SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, 
rmsBean.getInternalSequenceID());
+                               
+                               if(reallocatedRMSBean != null){
+                                       if (log.isDebugEnabled())
+                                               log.debug("Removing Reallocated 
RMSBean " + reallocatedRMSBean);
+                                       
storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID());
+                               }
+
                                
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
                                
storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
                        }
@@ -616,7 +628,7 @@
                                        
                                        // Mark the sequence as terminated
                                        RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
-                                       
TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+                                       
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
                                        
                                        if(log.isDebugEnabled()) 
log.debug("Sender::checkForOrphanMessages.  Orphaned message of type 
TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found.  Deleting this message with a 
sequence ID of : " + id);
                                        // Delete the terminate sender bean.

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 Fri Apr  3 18:27:57 2009
@@ -418,7 +418,7 @@
                                        String sequenceID = 
terminateSequence.getIdentifier().getIdentifier();
        
                                        RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
-                                       
TerminateManager.terminateSendingSide(rmsBean, storageManager, false);
+                                       
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
                                        
                                        if(transaction != null && 
transaction.isActive()) transaction.commit();
                                        transaction = null;

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=761754&r1=761753&r2=761754&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 Fri Apr  3 18:27:57 2009
@@ -82,7 +82,8 @@
 msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null.
 transportOutNotPresent=Sandesha2 Internal Error: original transport sender is 
not present.
 workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. 
Will try the next one.
-reallocationFailed=The sequence ''{0}'' could not be reallocated due to the 
error ''{1}''.
+reallocationFailed=Reallocation of msgs from sequence ''{0}'' failed, ''{1}''.
+reallocationForSyncRequestReplyNotSupported=Reallocation for sync requestReply 
not supported.
 couldNotFindOperation=Could not find operation for message type {0} and spec 
level {1}.
 cannotChooseAcksTo=Could not find an appropriate acksTo for the reply 
sequence, given inbound sequence {0} and bean info {1}.
 cannotChooseSpecLevel=Could not find an appropriate specification level for 
the reply sequence, given inbound sequence {0} and bean info {1}.



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscr...@ws.apache.org
For additional commands, e-mail: sandesha-dev-h...@ws.apache.org

Reply via email to