To check the fix, I'd like to understand what response is being returned when sending out a CreateSequence message that isn't a CreateSequenceResponse - I'm assuming a Reliable Messaging Fault, or is it something else ?
Andrew Gatford Hursley MP211 Telephone : Internal (7) 245743 External 01962 815743 Internet : [EMAIL PROTECTED] [EMAIL PROTECTED] 26/09/2007 17:20 To [EMAIL PROTECTED] cc Subject svn commit: r579708 - in /webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2: msgprocessors/ApplicationMsgProcessor.java storage/beans/RMSBean.java workers/SenderWorker.java Author: deepal Date: Wed Sep 26 09:20:24 2007 New Revision: 579708 URL: http://svn.apache.org/viewvc?rev=579708&view=rev Log: - Fixing issue when we send a CS request and receive non create sequence response , in that case we need to stop sending CS req and need to notify client abt that - When the timeout happen it never going to notify to the client - This commit fix both of the above , Chamikara or someone who expert about the code base please validate the patch Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=579708&r1=579707&r2=579708&view=diff ============================================================================== --- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original) +++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Wed Sep 26 09:20:24 2007 @@ -247,7 +247,6 @@ // with the same internal sequenceid // Check that someone hasn't created the bean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); - // if first message - setup the sending side sequence - both for the // server and the client sides. if (rmsBean == null) { @@ -363,7 +362,8 @@ } // Update the rmsBean - storageManager.getRMSBeanMgr().update(rmsBean); + rmsBean.setApplicationMessageMessageId(msgContext.getMessageID()); + storageManager.getRMSBeanMgr().update(rmsBean); if(startPolling) { SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean); Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java?rev=579708&r1=579707&r2=579708&view=diff ============================================================================== --- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java (original) +++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/beans/RMSBean.java Wed Sep 26 09:20:24 2007 @@ -144,7 +144,10 @@ */ private boolean avoidAutoTermination = false; - /** + //To store the message id if the outgoing appliction message + private String applicationMessageMessageId ; + + /** * Flags that are used to check if the primitive types on this bean * have been set. If a primitive type has not been set then it will * be ignored within the match method. @@ -512,4 +515,12 @@ return match; } + + public String getApplicationMessageMessageId() { + return applicationMessageMessageId; + } + + public void setApplicationMessageMessageId(String applicationMessageMessageId) { + this.applicationMessageMessageId = applicationMessageMessageId; + } } Modified: webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=579708&r1=579707&r2=579708&view=diff ============================================================================== --- webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Wed Sep 26 09:20:24 2007 @@ -7,6 +7,9 @@ import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; +import org.apache.axis2.client.async.Callback; +import org.apache.axis2.client.async.AxisCallback; +import org.apache.axis2.util.CallbackReceiver; import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; @@ -17,6 +20,7 @@ import org.apache.axis2.description.AxisOperation; import org.apache.axis2.description.OutOnlyAxisOperation; import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.engine.MessageReceiver; import org.apache.axis2.engine.Handler.InvocationResponse; import org.apache.axis2.transport.RequestResponseTransport; import org.apache.axis2.transport.TransportUtils; @@ -45,13 +49,7 @@ import org.apache.sandesha2.util.SandeshaUtil; import org.apache.sandesha2.util.SpecSpecificConstants; import org.apache.sandesha2.util.TerminateManager; -import org.apache.sandesha2.wsrm.AckRequested; -import org.apache.sandesha2.wsrm.CloseSequence; -import org.apache.sandesha2.wsrm.Identifier; -import org.apache.sandesha2.wsrm.LastMessage; -import org.apache.sandesha2.wsrm.MessageNumber; -import org.apache.sandesha2.wsrm.Sequence; -import org.apache.sandesha2.wsrm.TerminateSequence; +import org.apache.sandesha2.wsrm.*; public class SenderWorker extends SandeshaWorker implements Runnable { @@ -82,7 +80,7 @@ try { StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration()); SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr(); - + transaction = storageManager.getTransaction(); String key = senderBean.getMessageContextRefKey(); @@ -195,8 +193,8 @@ } //if the message belong to the Replay Model, it will be send out only if - - + + boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager); //save changes done @ updateMessage -> MessageRetransmissionAdjuster.adjustRetransmittion storageManager.getSenderBeanMgr().update(senderBean); @@ -210,7 +208,7 @@ transaction.commit(); transaction = null; } - + invokeCallBackObject(storageManager,msgCtx ,"Exit: SenderWorker::run, !continueSending"); return; } @@ -236,7 +234,7 @@ senderBeanMgr.update(bean2); } } - + // have to commit the transaction before sending. This may // get changed when WS-AT is available. if(transaction != null) { @@ -335,10 +333,15 @@ transaction = null; - if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) - checkForSyncResponses(msgCtx); - - if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) + if ((processResponseForFaults || successfullySent) && !msgCtx.isServerSide()) { + boolean validCs = checkForSyncResponses(msgCtx ); + if (!validCs) { + invokeCallBackObject(storageManager,msgCtx , + "Sandesha2 sender thread has not received a valid CreateSequnceResponse"); + } + } + + if ((rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) && (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue()))) { try { @@ -505,8 +508,13 @@ log.debug("Exit: SenderWorker::isAckPiggybackableMsgType, " + piggybackable); return piggybackable; } - - private void checkForSyncResponses(MessageContext msgCtx) { + + /** + * return value will be false if the create sequence fails else it will be true + * @param msgCtx + * @return + */ + private boolean checkForSyncResponses(MessageContext msgCtx ) { if (log.isDebugEnabled()) log.debug("Enter: SenderWorker::checkForSyncResponses, " + msgCtx.getEnvelope().getHeader()); @@ -522,7 +530,7 @@ boolean transportInPresent = (msgCtx.getProperty(MessageContext.TRANSPORT_IN) != null); if (!transportInPresent && (responseMessageContext==null || responseMessageContext.getEnvelope()==null)) { if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses, no response present"); - return; + return true; } //to find out weather the response was built by me. @@ -592,7 +600,7 @@ log.error ("Caught exception", e); } - return; + return true; } //If addressing is disabled we will be adding this message simply as the application response of the request message. @@ -631,7 +639,7 @@ //if the syncResponseWas not built here and the client was not expecting a sync response. We will not try to execute //here. Doing so will cause a double invocation for a async message. if (msgCtx.getOptions().isUseSeparateListener()==true && !syncResponseBuilt) { - return; + return true; } @@ -650,13 +658,22 @@ } } catch (Exception e) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse); - if (log.isWarnEnabled()) - log.warn(message, e); + + String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse); + if (msgCtx != null &&! msgCtx.isServerSide() && + (Sandesha2Constants.SPEC_2005_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction()) + || Sandesha2Constants.SPEC_2007_02.Actions.ACTION_CREATE_SEQUENCE.equals(msgCtx.getSoapAction())) ){ + // We have not received a valid createSequnce reponse for the request we send so we need to terminate the seunce here + return false; + } else { + if (log.isWarnEnabled()) + log.warn(message, e); + } } if (log.isDebugEnabled()) log.debug("Exit: SenderWorker::checkForSyncResponses"); - } + return true; + } private void recordError (Exception e, RMMsgContext outRMMsg, StorageManager storageManager) throws SandeshaStorageException { // Store the Exception as a sequence property to enable the client to lookup the last @@ -702,5 +719,60 @@ } } } - + + private void invokeCallBackObject(StorageManager storageManager, + MessageContext msgCtx, + String message) throws SandeshaStorageException { + Transaction transaction = null; + if (msgCtx.isServerSide()) { + return; + } + try { + transaction = storageManager.getTransaction(); + //terminate message sent using the SandeshaClient. Since the terminate message will simply get the + //InFlow of the reference message get called which could be zero sized (OutOnly operations). + + // terminate sending side if this is the WSRM 1.0 spec. + // If the WSRM versoion is 1.1 termination will happen in the terminate sequence response message. + + String internalSequenceId = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID); + if (internalSequenceId == null) internalSequenceId = senderBean.getInternalSequenceID(); + if (internalSequenceId != null) { + // Create a new Transaction + transaction = storageManager.getTransaction(); + RMSBean bean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId); + TerminateManager.terminateSendingSide(bean, storageManager); + + OperationContext opCtx = + configurationContext.getOperationContext(bean.getApplicationMessageMessageId()); + if (opCtx != null) { + AxisOperation applicationAxisOperation = opCtx.getAxisOperation(); + if (applicationAxisOperation != null) { + MessageReceiver msgReceiver = applicationAxisOperation.getMessageReceiver(); + if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) { + Object callback = ((CallbackReceiver) msgReceiver) + .lookupCallback(bean.getApplicationMessageMessageId()); + if (callback != null) { + AxisCallback axisCallback = ((AxisCallback) callback); + axisCallback.onError(new Exception(message)); + axisCallback.onComplete(); + } + } + } + } + if (transaction != null && transaction.isActive()) transaction.commit(); + transaction = null; + } + + } catch (Exception e) { + if (log.isWarnEnabled()) + log.warn(e); + } finally { + if (transaction != null && transaction.isActive()) { + transaction.rollback(); + transaction = null; + } + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED] Unless stated otherwise above: IBM United Kingdom Limited - Registered in England and Wales with number 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
