Author: parsonsd Date: Fri Apr 3 18:27:57 2009 New Revision: 761754 URL: http://svn.apache.org/viewvc?rev=761754&view=rev Log: Fix to allow automatic reallocation of sequences that have timed out or been deleted. The solution is to have a reallocated RMSBean point at the RMSBean created as part of the reallocation via a new RMSBean attribute that contains the internalSeqID of the newly created RMSBean.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Fri Apr 3 18:27:57 2009 @@ -245,6 +245,23 @@ String ENDPOINT = "Endpoint"; String UNSUPPORTED_ELEMENT = "UnsupportedElement"; + + //This is to identify an RMSBean that hasn't been reallocated + int NOT_REALLOCATED = 0; + + //This is to identify an RMSBean that is to be reallocated or has been reallocated + int REALLOCATED = 1; + + //This is to identify an RMSBean that was created for reallocation but then was reallocated itself + //That way we know it can be deleted + int ORIGINAL_REALLOCATED_BEAN_COMPLETE = 2; + + //This is to identify the RMS Bean that was created to reallocate another RMSBean + int RMS_BEAN_USED_FOR_REALLOCATION = 3; + + //This is to identify an RMSBean that was attempted to be reallocated but for some reason the reallocation failed. + int REALLOCATION_FAILED = -1; + } public interface WSA { Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Fri Apr 3 18:27:57 2009 @@ -443,8 +443,19 @@ if (terminatedSequence) { // Delete the rmsBean storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID()); + + if(tran != null && tran.isActive()) tran.commit(); + tran = storageManager.getTransaction(); + + //Need to check if it's an RMSBean created for reallocation. If so we need to + //delete the original RMSBean that was reallocated. + RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID()); + if(reallocatedRMSBean != null){ + if (log.isDebugEnabled()) + log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean); + storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID()); + } } - if(tran != null && tran.isActive()) tran.commit(); tran = null; Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Fri Apr 3 18:27:57 2009 @@ -79,8 +79,8 @@ public static final String propertyInvalidValue="propertyInvalidValue"; public static final String invalidRange="invalidRange"; public static final String workAlreadyAssigned="workAlreadyAssigned"; - public static final String reallocationFailed="reallocationFailed"; - + public static final String reallocationFailed="reallocationFailed"; + public static final String reallocationForSyncRequestReplyNotSupported="reallocationForSyncRequestReplyNotSupported"; public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence"; public static final String unknownWSAVersion="unknownWSAVersion"; Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri Apr 3 18:27:57 2009 @@ -177,8 +177,6 @@ if (msgContext.getMessageID() == null) msgContext.setMessageID(SandeshaUtil.getUUID()); - - /* * Internal sequence id is the one used to refer to the sequence (since * actual sequence id is not available when first msg arrives) server @@ -230,13 +228,47 @@ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); + boolean autoStartNewSeqForReallocation = false; //if this is an existing sequence then we need to do some checks first if(rmsBean != null) { + //If the sequence has been reallocated we need to find out the new internalSeqID. + //If the internalSeqID hasn't been set yet we should auto restart. If it has a new + //internalSeqID we just send the message on the new reallocated sequence. + int seqReallocated = rmsBean.isReallocated(); + if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATED){ + if (log.isDebugEnabled()) + log.debug("ApplicationMsgProcessor: Reallocated Sequence: " + rmsBean.getSequenceID()); + //Try and get the new internalSeqID + internalSequenceId = rmsBean.getInternalSeqIDOfSeqUsedForReallocation(); + if(internalSequenceId != null){ + if (log.isDebugEnabled()) + log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " + internalSequenceId); + rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceId); + rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); + } else { + autoStartNewSeqForReallocation = true; + } + } else if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){ + //We can't do anymore as we have already tried to reallocate this sequence. + throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(), + "We have already attempted to reallocate this Sequence and we won't try again. The sequance needs to be cleaned up manually.")); + } + //see if the sequence is closed - if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut()){ + if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut() || autoStartNewSeqForReallocation){ if(SandeshaUtil.isAutoStartNewSequence(msgContext)){ internalSequenceId = getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence + if(autoStartNewSeqForReallocation){ + if (log.isDebugEnabled()) + log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation: InternalSeqID of new sequence used for reallocation: " + + internalSequenceId); + rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId); + storageManager.getRMSBeanMgr().update(rmsBean); + + if(tran != null && tran.isActive()) tran.commit(); + tran = storageManager.getTransaction(); + } if (log.isDebugEnabled()) log.debug("ApplicationMsgProcessor: auto start new sequence " + internalSequenceId + " :: " + rmsBean); //set this new internal sequence ID on the msg @@ -337,6 +369,11 @@ if (rmsBean == null) { rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager); rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager); + + if(autoStartNewSeqForReallocation){ + rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION); + } + if(rmsBean != null) outSequenceID = rmsBean.getSequenceID(); if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) { @@ -348,7 +385,6 @@ appMsgProcTran = storageManager.getTransaction(); } } - } } else { @@ -554,6 +590,7 @@ if (log.isDebugEnabled()) log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE); + return true; } Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr 3 18:27:57 2009 @@ -154,7 +154,7 @@ if(!rmsBeanMgr.update(rmsBean)){ //Im not setting the createSeqBean sender bean to resend true as the reallocation of msgs will do this try{ - TerminateManager.terminateSendingSide(rmsBean, storageManager, true); + TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction); } catch(Exception e){ if (log.isDebugEnabled()) log.debug(e); Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr 3 18:27:57 2009 @@ -74,7 +74,7 @@ } } - TerminateManager.terminateSendingSide (rmsBean, storageManager, false); + TerminateManager.terminateSendingSide (rmsBean, storageManager, false, null); // Stop this message travelling further through the Axis runtime terminateResRMMsg.pause(); Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Fri Apr 3 18:27:57 2009 @@ -19,6 +19,7 @@ package org.apache.sandesha2.storage.beans; +import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.util.Range; import org.apache.sandesha2.util.RangeString; @@ -152,6 +153,22 @@ * be ignored within the match method. */ private int rmsFlags = 0; + + /** + * Indicates the reallocation state. The states can be either: + * notReallocated - The bean hasn't been reallocated + * reallocated - The bean is to be reallocated + * ReallocatedBeanComplete - The bean was created for reallocation but is no longer needed as itself has been reallocated + * BeanUsedForReallocation - The bean was created for reallocation + * ReallocationFailed - The reallocation of this bean failed + */ + private int reallocated = Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED; + + /** + * Contains the internalSeqID of the seq that has sent the reallocated msgs + */ + private String internalSeqIDOfSeqUsedForReallocation = null; + public static final int LAST_SEND_ERROR_TIME_FLAG = 0x00000001; public static final int LAST_OUT_MSG_FLAG = 0x00000010; public static final int HIGHEST_OUT_MSG_FLAG = 0x00000100; @@ -195,7 +212,9 @@ terminationPauserForCS = beanToCopy.isTerminationPauserForCS(); timedOut = beanToCopy.isTimedOut(); transportTo = beanToCopy.getTransportTo(); - avoidAutoTermination = beanToCopy.isAvoidAutoTermination(); + avoidAutoTermination = beanToCopy.isAvoidAutoTermination(); + reallocated = beanToCopy.isReallocated(); + internalSeqIDOfSeqUsedForReallocation = beanToCopy.getInternalSeqIDOfSeqUsedForReallocation(); } public String getCreateSeqMsgID() { @@ -434,6 +453,8 @@ result.append("\nClientCompletedMsgs: "); result.append(clientCompletedMessages); result.append("\nAnonymous UUID : "); result.append(anonymousUUID); result.append("\nSOAPVersion : "); result.append(soapVersion); + result.append("\nReallocated : "); result.append(reallocated); + result.append("\nInternalSeqIDOfSeqUsedForReallocation : "); result.append(internalSeqIDOfSeqUsedForReallocation); return result.toString(); } @@ -478,6 +499,9 @@ else if(bean.getAnonymousUUID() != null && !bean.getAnonymousUUID().equals(this.getAnonymousUUID())) match = false; + else if((bean.getInternalSeqIDOfSeqUsedForReallocation() != null && !bean.getInternalSeqIDOfSeqUsedForReallocation().equals(this.getInternalSeqIDOfSeqUsedForReallocation()))) + match = false; + // Avoid matching on the error information // else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp()) // match = false; @@ -511,8 +535,26 @@ else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && bean.getExpectedReplies() != this.getExpectedReplies()) match = false; + + return match; } + public int isReallocated() { + return reallocated; + } + + public void setReallocated(int reallocated) { + this.reallocated = reallocated; + } + + public String getInternalSeqIDOfSeqUsedForReallocation() { + return internalSeqIDOfSeqUsedForReallocation; + } + + public void setInternalSeqIDOfSeqUsedForReallocation(String internalSeqIDOfSeqUsedForReallocation) { + this.internalSeqIDOfSeqUsedForReallocation = internalSeqIDOfSeqUsedForReallocation; + } + } Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Fri Apr 3 18:27:57 2009 @@ -619,7 +619,7 @@ if (log.isDebugEnabled()) log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader()); - // Sending the message + //Sending the message //having a surrounded try block will make sure that the error is logged here //and that this does not disturb the processing of a carrier message. try { @@ -671,7 +671,7 @@ } - private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart) throws AxisFault { + private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: FaultManager::manageIncomingFault"); InvocationResponse response = InvocationResponse.CONTINUE; @@ -743,7 +743,7 @@ } else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) || Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) || Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode)) { - processSequenceUnknownFault(rmMsgCtx, fault, identifier); + processSequenceUnknownFault(rmMsgCtx, fault, identifier, transaction); } // If the operation is an Sandesha In Only operation, or the fault is a recognised fault, @@ -783,7 +783,7 @@ // constructing the fault AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, rmMsgCtx); - response = manageIncomingFault (axisFault, rmMsgCtx, faultPart); + response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, transaction); if(transaction != null && transaction.isActive()) transaction.commit(); transaction = null; @@ -966,7 +966,7 @@ // Cleanup sending side. if (log.isDebugEnabled()) log.debug("Terminating sending sequence " + rmsBean); - TerminateManager.terminateSendingSide(rmsBean, storageManager, false); + TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null); if (log.isDebugEnabled()) log.debug("Exit: FaultManager::processCreateSequenceRefusedFault"); @@ -980,7 +980,7 @@ * @param fault * @param identifier */ - private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID) throws AxisFault { + private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID); @@ -998,16 +998,16 @@ // Cleanup sending side. if (log.isDebugEnabled()) log.debug("Terminating sending sequence " + rmsBean); - if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true)){ + if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction)){ // We did not reallocate so we notify the clients of a failure notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault); + + //Mark the RMSBean as reallocation failed and update last activation time + transaction = storageManager.getTransaction(); + rmsBean.setLastActivatedTime(System.currentTimeMillis()); + storageManager.getRMSBeanMgr().update(rmsBean); + if(transaction != null && transaction.isActive()) transaction.commit(); } - - // Update the last activated time. - rmsBean.setLastActivatedTime(System.currentTimeMillis()); - - // Update the bean in the map - storageManager.getRMSBeanMgr().update(rmsBean); } else { RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID); Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Fri Apr 3 18:27:57 2009 @@ -44,6 +44,7 @@ import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.client.Options; import org.apache.axis2.client.ServiceClient; +import org.apache.axis2.client.async.Callback; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.OperationContext; @@ -59,6 +60,8 @@ import org.apache.axis2.engine.AxisConfiguration; import org.apache.axis2.engine.AxisEngine; import org.apache.axis2.engine.Handler; +import org.apache.axis2.engine.MessageReceiver; +import org.apache.axis2.util.CallbackReceiver; import org.apache.axis2.wsdl.WSDLConstants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,6 +77,7 @@ import org.apache.sandesha2.security.SecurityManager; import org.apache.sandesha2.security.SecurityToken; import org.apache.sandesha2.storage.StorageManager; +import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr; import org.apache.sandesha2.storage.beans.RMDBean; import org.apache.sandesha2.storage.beans.RMSBean; @@ -1015,10 +1019,22 @@ return targetEnv; } - - public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean, List<MessageContext> msgsToSend)throws AxisFault{ - if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) - log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence"); + + + /** + * ReallocateMessages to a new sequence + * @param storageManager + * @param oldRMSBean + * @param msgsToSend + * @param transaction + * + */ + public static void reallocateMessagesToNewSequence(StorageManager storageManager, RMSBean oldRMSBean, + List<MessageContext> msgsToSend, Transaction transaction) + throws AxisFault, SandeshaException{ + + if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) + log.debug("Enter: SandeshaUtil::reallocateMessagesToNewSequence"); ConfigurationContext ctx = storageManager.getContext(); ServiceClient client = new ServiceClient(ctx, null); @@ -1027,30 +1043,68 @@ Options options = client.getOptions(); options.setTo(oldRMSBean.getToEndpointReference()); options.setReplyTo(oldRMSBean.getReplyToEndpointReference()); - - //internal sequence ID is different - String internalSequenceID = oldRMSBean.getInternalSequenceID(); - //we also need to obtain the sequenceKey from the internalSequenceID. - String oldSequenceKey = - SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, oldRMSBean.getToEndpointReference().getAddress()); - //remove the old sequence key from the internal sequence ID - internalSequenceID = internalSequenceID.substring(0, internalSequenceID.length()-oldSequenceKey.length()); - options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, - SandeshaUtil.getUUID()); //using a new sequence Key to differentiate from the old sequence - options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID); - options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion()); - options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE); - - //send the msgs - this will setup a new sequence to the same endpoint - Iterator<MessageContext> it = msgsToSend.iterator(); - while(it.hasNext()){ - MessageContext msgCtx = (MessageContext)it.next(); - client.getOptions().setAction(msgCtx.getWSAAction()); - client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement()); - } - - if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) - log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence"); + + //internal sequence ID is different + String internalSequenceID = oldRMSBean.getInternalSequenceID(); + + options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID); + options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, oldRMSBean.getRMVersion()); + options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.FALSE); + + //Update the RMSBean so as to mark it as reallocated if it isn't an RMSbean created for a previous reallocation + RMSBean originallyReallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, oldRMSBean.getInternalSequenceID()); + if(originallyReallocatedRMSBean == null){ + oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATED); + storageManager.getRMSBeanMgr().update(oldRMSBean); + } else { + options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, originallyReallocatedRMSBean.getInternalSequenceID()); + originallyReallocatedRMSBean.setInternalSeqIDOfSeqUsedForReallocation(null); + storageManager.getRMSBeanMgr().update(originallyReallocatedRMSBean); + + //Setting this property so that the bean can be deleted + oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE); + oldRMSBean.setInternalSeqIDOfSeqUsedForReallocation(originallyReallocatedRMSBean.getInternalSequenceID()); + storageManager.getRMSBeanMgr().update(oldRMSBean); + } + + //Commit current transaction that wraps the manageFaultMsg as we are about to start + //resending msgs on a new seq and they will need to get a transaction on the + //current thread + if(transaction != null && transaction.isActive()) transaction.commit(); + + //send the msgs - this will setup a new sequence to the same endpoint + Iterator<MessageContext> it = msgsToSend.iterator(); + + while(it.hasNext()){ + MessageContext msgCtx = (MessageContext)it.next(); + + //Set the action + client.getOptions().setAction(msgCtx.getWSAAction()); + + //Set the message ID + client.getOptions().setMessageId(msgCtx.getMessageID()); + + //Get the AxisOperation + AxisOperation axisOperation = msgCtx.getAxisOperation(); + + //If it's oneway or async, reallocate + //Fail if replyTo is annonymous as this is currently not supported because in twoway we can't get responses back to th eold something + if(axisOperation.getAxisSpecificMEPConstant() == WSDLConstants.MEP_CONSTANT_OUT_ONLY){ + client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement()); + } else if (client.getOptions().getReplyTo().hasAnonymousAddress()){ + oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED); + storageManager.getRMSBeanMgr().update(oldRMSBean); + throw new SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported); + } else { + MessageReceiver msgReceiver = axisOperation.getMessageReceiver(); + Object callback = ((CallbackReceiver)msgReceiver).lookupCallback(msgCtx.getMessageID()); + client.setAxisService(msgCtx.getAxisService()); + client.sendReceiveNonBlocking(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement(), (Callback)callback); + } + } + + if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) + log.debug("Exit: SandeshaUtil::reallocateMessagesToNewSequence"); } /** @@ -1276,4 +1330,16 @@ return result; } + public static RMSBean isLinkedToReallocatedRMSBean(StorageManager storageManager, String internalSeqID) throws SandeshaException { + if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean"); + + //Need to check if it's an RMSBean created for reallocation. + RMSBean finderBean = new RMSBean(); + finderBean.setInternalSeqIDOfSeqUsedForReallocation(internalSeqID); + RMSBean reallocatedRMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean); + + if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: SandeshaUtil::isLinkedToReallocatedRMSBean, ReallocatedRMSBean: " + reallocatedRMSBean); + return reallocatedRMSBean; + } + } Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Fri Apr 3 18:27:57 2009 @@ -25,8 +25,11 @@ import java.util.LinkedList; import java.util.List; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFault; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; +import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; @@ -40,12 +43,13 @@ import org.apache.sandesha2.i18n.SandeshaMessageKeys; import org.apache.sandesha2.storage.SandeshaStorageException; import org.apache.sandesha2.storage.StorageManager; +import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr; import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; -import org.apache.sandesha2.storage.beans.RMSBean; import org.apache.sandesha2.storage.beans.InvokerBean; import org.apache.sandesha2.storage.beans.RMDBean; +import org.apache.sandesha2.storage.beans.RMSBean; import org.apache.sandesha2.storage.beans.SenderBean; /** @@ -231,14 +235,14 @@ * @return true if the reallocation happened sucessfully */ public static boolean terminateSendingSide(RMSBean rmsBean, - StorageManager storageManager, boolean reallocate) throws SandeshaException { + StorageManager storageManager, boolean reallocate, Transaction transaction) throws SandeshaException { // Indicate that the sequence is terminated rmsBean.setTerminated(true); rmsBean.setTerminateAdded(true); storageManager.getRMSBeanMgr().update(rmsBean); - return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate); + return cleanSendingSideData (rmsBean.getInternalSequenceID(), storageManager, rmsBean, reallocate, transaction); } public static void timeOutSendingSideSequence(String internalSequenceId, @@ -249,11 +253,11 @@ rmsBean.setLastActivatedTime(System.currentTimeMillis()); storageManager.getRMSBeanMgr().update(rmsBean); - cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false); + cleanSendingSideData(internalSequenceId, storageManager, rmsBean, false, null); } private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager, - RMSBean rmsBean, boolean reallocateIfPossible) throws SandeshaException { + RMSBean rmsBean, boolean reallocateIfPossible, Transaction transaction) throws SandeshaException { if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + reallocateIfPossible); @@ -274,12 +278,15 @@ if(ranges.length==1){ //the sequence is a single contiguous acked range lastAckedMsg = ranges[0].upperValue; - } - else{ - //cannot reallocate as there are gaps - reallocateIfPossible=false; - if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) - log.debug("cannot reallocate sequence as there are gaps"); + } else{ + if(reallocateIfPossible){ + //cannot reallocate as there are gaps + rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED); + storageManager.getRMSBeanMgr().update(rmsBean); + reallocateIfPossible=false; + if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) + log.debug("cannot reallocate sequence as there are gaps"); + } } while (iterator.hasNext()) { @@ -332,14 +339,48 @@ if(reallocateIfPossible){ try{ - SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate); - reallocatedOK = true; - } - catch(Exception e){ - //want that the reallocation failed + SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate, transaction); + reallocatedOK = true; + + //If the reallocation was successful and the RMSBean being reallocated was originally created for reallocation + //the RMSBean can be deleted. + transaction = storageManager.getTransaction(); + if(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){ + rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED); + storageManager.getRMSBeanMgr().update(rmsBean); + } + + if(transaction != null && transaction.isActive()) transaction.commit(); + transaction = null; + } catch(Exception e){ + if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(), e.toString())); - } + + //Reallocation Failed + //Need to mark any RMSBeans involved as failed so that we don't attempt to send + //anymore messages on these seq's. The client will have to manually reallocate and + //administer the sequences. + transaction = storageManager.getTransaction(); + + rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED); + storageManager.getRMSBeanMgr().update(rmsBean); + + String intSeqIDOfOriginallyReallocatedSeq = rmsBean.getInternalSeqIDOfSeqUsedForReallocation(); + if(intSeqIDOfOriginallyReallocatedSeq != null){ + RMSBean origRMSBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, intSeqIDOfOriginallyReallocatedSeq); + origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED); + storageManager.getRMSBeanMgr().update(origRMSBean); + } + + if(transaction != null && transaction.isActive()) transaction.commit(); + transaction = null; + + } finally { + if (transaction != null && transaction.isActive()) { + transaction.rollback(); + } + } } if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Fri Apr 3 18:27:57 2009 @@ -427,7 +427,7 @@ private void deleteRMSBeans(List<RMSBean> rmsBeans, SandeshaPolicyBean propertyBean, long deleteTime) - throws SandeshaStorageException { + throws SandeshaStorageException, SandeshaException { if (log.isDebugEnabled()) log.debug("Enter: Sender::deleteRMSBeans"); @@ -437,12 +437,24 @@ RMSBean rmsBean = (RMSBean) beans.next(); long timeNow = System.currentTimeMillis(); long lastActivated = rmsBean.getLastActivatedTime(); + // delete sequences that have been timedout or deleted for more than // the SequenceRemovalTimeoutInterval - - if ((lastActivated + deleteTime) < timeNow) { + if (((lastActivated + deleteTime) < timeNow) && + (rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED)) { if (log.isDebugEnabled()) log.debug("Removing RMSBean " + rmsBean); + + //Need to check if it's an RMSBean created for reallocation. If so we need to + //delete the original RMSBean that was reallocated. + RMSBean reallocatedRMSBean = SandeshaUtil.isLinkedToReallocatedRMSBean(storageManager, rmsBean.getInternalSequenceID()); + + if(reallocatedRMSBean != null){ + if (log.isDebugEnabled()) + log.debug("Removing Reallocated RMSBean " + reallocatedRMSBean); + storageManager.getRMSBeanMgr().delete(reallocatedRMSBean.getCreateSeqMsgID()); + } + storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID()); storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey()); } @@ -616,7 +628,7 @@ // Mark the sequence as terminated RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(manager, id); - TerminateManager.terminateSendingSide(rmsBean, storageManager, false); + TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null); if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found. Deleting this message with a sequence ID of : " + id); // Delete the terminate sender bean. Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr 3 18:27:57 2009 @@ -418,7 +418,7 @@ String sequenceID = terminateSequence.getIdentifier().getIdentifier(); RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID); - TerminateManager.terminateSendingSide(rmsBean, storageManager, false); + TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null); if(transaction != null && transaction.isActive()) transaction.commit(); transaction = null; Modified: webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=761754&r1=761753&r2=761754&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties Fri Apr 3 18:27:57 2009 @@ -82,7 +82,8 @@ msgContextNotSet=Sandesha2 Internal Error: ''MessageContext'' is null. transportOutNotPresent=Sandesha2 Internal Error: original transport sender is not present. workAlreadyAssigned=Work ''{0}'' is already assigned to a different Worker. Will try the next one. -reallocationFailed=The sequence ''{0}'' could not be reallocated due to the error ''{1}''. +reallocationFailed=Reallocation of msgs from sequence ''{0}'' failed, ''{1}''. +reallocationForSyncRequestReplyNotSupported=Reallocation for sync requestReply not supported. couldNotFindOperation=Could not find operation for message type {0} and spec level {1}. cannotChooseAcksTo=Could not find an appropriate acksTo for the reply sequence, given inbound sequence {0} and bean info {1}. cannotChooseSpecLevel=Could not find an appropriate specification level for the reply sequence, given inbound sequence {0} and bean info {1}. --------------------------------------------------------------------- To unsubscribe, e-mail: sandesha-dev-unsubscr...@ws.apache.org For additional commands, e-mail: sandesha-dev-h...@ws.apache.org