Author: parsonsd Date: Fri Apr 3 11:35:11 2009 New Revision: 761627 URL: http://svn.apache.org/viewvc?rev=761627&view=rev Log: Fixes made to improve compliance with spec's and interop with other implementations: - AckRequest's now added as piggybacks to application msgs (this is needed as implementations don't have to piggyback acks and therefore could only respond to ackRequest msgs.) - We can ignore piggybacked ack requests as Sandesha piggyback acks at every opportunity - Offered EP's can't be set to none so have set to use AcksTo as the offered EP if one hasn't been set - Updated so that we poll for terminateSeqResponse - Updated so that we offer for 1.1 as well as 1.0. This fixes an interop issue with WCF.
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri Apr 3 11:35:11 2009 @@ -47,6 +47,7 @@ import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beans.RMDBean; import org.apache.sandesha2.storage.beans.SenderBean; +import org.apache.sandesha2.util.AcknowledgementManager; import org.apache.sandesha2.util.FaultManager; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.RMMsgCreator; @@ -98,6 +99,15 @@ //checks weather the ack request was a piggybacked one. boolean piggybackedAckRequest = !(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK_REQUEST); + + //it is a piggybacked ackrequest so we can ignore as we will piggyback acks at every opportunity anyway + if(piggybackedAckRequest){ + if (log.isDebugEnabled()) + log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, it is a piggybacked ackrequest for seq " + + "so we can ignore as we will piggyback an ack " + Boolean.FALSE); + //No need to suspend. Just proceed. + return false; + } String sequenceId = ackRequested.getIdentifier().getIdentifier(); @@ -143,10 +153,7 @@ //creating the ack message. If the ackRequest was a standalone this will be a out (response) message MessageContext ackMsgCtx = null; -// if (piggybackedAckRequest) - ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation); -// else -// ackMsgCtx =MessageContextBuilder.createOutMessageContext (msgContext); + ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation); //setting up the RMMsgContext RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx); @@ -196,65 +203,14 @@ } } else { - //If AcksTo is non-anonymous we will be adding a senderBean entry here. The sender is responsible - //for sending it out. - - SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr(); - - String key = SandeshaUtil.getUUID(); - - // dumping to the storage will be done be Sandesha2 Transport Sender - // storageManager.storeMessageContext(key,ackMsgCtx); - - SenderBean ackBean = new SenderBean(); - ackBean.setMessageContextRefKey(key); - ackBean.setMessageID(ackMsgCtx.getMessageID()); - - //acks are sent only once. - ackBean.setReSend(false); - - ackBean.setSequenceID(sequenceId); - - EndpointReference to = ackMsgCtx.getTo(); - if (to!=null) - ackBean.setToAddress(to.getAddress()); - - // this will be set to true in the sender. - ackBean.setSend(true); - - ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE); - ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation()); long ackInterval = propertyBean.getAcknowledgementInterval(); - // Ack will be sent as stand alone, only after the ackknowledgement interval + // Ack will be sent as stand alone, only after the acknowledgement interval long timeToSend = System.currentTimeMillis() + ackInterval; - // removing old acks. - SenderBean findBean = new SenderBean(); - findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); - Collection<SenderBean> coll = senderBeanMgr.find(findBean); - Iterator<SenderBean> it = coll.iterator(); - - if (it.hasNext()) { - SenderBean oldAckBean = (SenderBean) it.next(); - // If there is an old Ack. This Ack will be sent in the old timeToSend. - timeToSend = oldAckBean.getTimeToSend(); - senderBeanMgr.delete(oldAckBean.getMessageID()); - } - - ackBean.setTimeToSend(timeToSend); - - msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE); - - // passing the message through sandesha2sender - SandeshaUtil.executeAndStore(ackRMMsgCtx, key, storageManager); - - // inserting the new Ack. - senderBeanMgr.insert(ackBean); - - msgContext.pause(); + AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager); } if (log.isDebugEnabled()) Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri Apr 3 11:35:11 2009 @@ -178,58 +178,66 @@ // offered seq id String offeredSequenceID = offer.getIdentifer().getIdentifier(); + + //Need to see if this is a duplicate offer. + //If it is we can't accept the offer as we can't be sure it has come from the same client. + RMSBean finderBean = new RMSBean (); + finderBean.setSequenceID(offeredSequenceID); + RMSBean rMSBean = storageManager.getRMSBeanMgr().findUnique(finderBean); + boolean offerAccepted = false; + String outgoingSideInternalSequenceId = SandeshaUtil + .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID()); + + if(rMSBean != null){ + if (log.isDebugEnabled()) + log.debug("Duplicate offer so we can't accept as we can't be sure it's from the same client: " + offeredSequenceID); + offerAccepted = false; + } else { + boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager); + offerAccepted = true; - boolean isValidseqID = isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager); - boolean offerAccepted = true; - - RMSBean rMSBean = null; - //Before processing this offer any further we need to perform some extra checks - //on the offered EP if WS-RM Spec 1.1 is being used - if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){ - Endpoint endpoint = offer.getEndpoint(); - if (endpoint!=null) { - //Check to see if the offer endpoint has a value of WSA Anonymous - String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION); - String endpointAddress = endpoint.getEPR().getAddress(); - if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){ - //We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient - if (log.isDebugEnabled()) - log.debug("CSeq msg contains offer with an anonymous EPR"); - log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(), - offeredSequenceID)); - } - - rMSBean = new RMSBean(); - //Set the offered EP - rMSBean.setOfferedEndPoint(endpointAddress); + //Before processing this offer any further we need to perform some extra checks + //on the offered EP if WS-RM Spec 1.1 is being used + if(isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){ + Endpoint endpoint = offer.getEndpoint(); + if (endpoint!=null) { + //Check to see if the offer endpoint has a value of WSA Anonymous + String addressingNamespace = (String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION); + String endpointAddress = endpoint.getEPR().getAddress(); + if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){ + //We will still accept this offer but we should warn the user that this MEP is not always reliable or efficient + if (log.isDebugEnabled()) + log.debug("CSeq msg contains offer with an anonymous EPR"); + log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning, createSeqRMMsg.getMessageContext().getMessageID(), + offeredSequenceID)); + } + rMSBean = new RMSBean(); //Set the offered EP + rMSBean.setOfferedEndPoint(endpointAddress); - } else { - //Don't accept the offer - if (log.isDebugEnabled()) - log.debug("Offer Refused as it included a null endpoint"); - offerAccepted = false; + } else { + //Don't accept the offer + if (log.isDebugEnabled()) + log.debug("Offer Refused as it included a null endpoint"); + offerAccepted = false; + } + } else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){ + rMSBean = new RMSBean(); } - } else if (isValidseqID && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){ - rMSBean = new RMSBean(); - } - - String outgoingSideInternalSequenceId = SandeshaUtil - .getOutgoingSideInternalSequenceID(rmdBean.getSequenceID()); - - if(isValidseqID){ - // Setting the CreateSequence table entry for the outgoing - // side. - rMSBean.setSequenceID(offeredSequenceID); - rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId); - // this is a dummy value - rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); - - //Try inserting the new RMSBean - if(!storageManager.getRMSBeanMgr().insert(rMSBean)){ - offerAccepted = false; + if(isValidseqID){ + // Setting the CreateSequence table entry for the outgoing + // side. + rMSBean.setSequenceID(offeredSequenceID); + rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId); + // this is a dummy value + rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); + + //Try inserting the new RMSBean + if(!storageManager.getRMSBeanMgr().insert(rMSBean)){ + offerAccepted = false; + } } } - + if (offerAccepted) { if(rmdBean.getToEndpointReference() != null){ rMSBean.setToEndpointReference(rmdBean.getToEndpointReference()); Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri Apr 3 11:35:11 2009 @@ -199,6 +199,7 @@ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) { if(rmsBean.isPollingMode()) { rMDBean.setPollingMode(true); + rMDBean.setReplyToEndpointReference(rmsBean.getReplyToEndpointReference()); } } Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Fri Apr 3 11:35:11 2009 @@ -63,9 +63,10 @@ msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID()); //shedulling a polling request for the response side. - if (rmsBean.getOfferedSequence()!=null) { + String offeredSeq = rmsBean.getOfferedSequence(); + if (offeredSeq!=null) { RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr(); - RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId); + RMDBean rMDBean = rMDBeanMgr.retrieve(offeredSeq); if (rMDBean!=null && rMDBean.isPollingMode()) { PollingManager manager = storageManager.getPollingManager(); Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Fri Apr 3 11:35:11 2009 @@ -153,17 +153,31 @@ } else { if (log.isDebugEnabled()) log.debug("Polling rms " + beanToPoll); + // The sequence is there, but we still only poll if we are expecting reply messages, - // or if we don't have clean ack state. (We assume acks are clean, and only unset + // if we don't have clean ack state or we are waiting for a terminateSeqResponse. (We assume acks are clean, and only unset // this if we find evidence to the contrary). boolean cleanAcks = true; - if (beanToPoll.getNextMessageNumber() > -1) + boolean waitingForTerminateSeqResponse = false; + long repliesExpected = 0; + + if (!force && beanToPoll.getNextMessageNumber() > -1) { cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber()); - long repliesExpected = beanToPoll.getExpectedReplies(); + if(cleanAcks){ + repliesExpected = beanToPoll.getExpectedReplies(); + if(repliesExpected == 0){ + //Need to check if we are waiting for a terminateSeqResponse + if(beanToPoll.isTerminated() == false && beanToPoll.isTerminateAdded() == true){ + waitingForTerminateSeqResponse = true; + } + } + } + } + if(beanToPoll.getSequenceID() != null){ - if((force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null){ + if((force || !cleanAcks || repliesExpected > 0 || waitingForTerminateSeqResponse) && beanToPoll.getReferenceMessageStoreKey() != null){ pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry); - } + } } else { //If seqID is null on RMS bean then it must be an RMSBean waiting for a createSeqResponse and we want to poll for these pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry); @@ -233,7 +247,7 @@ wireSeqId = rmBean.getSequenceID(); //this case could make us non-RSP compliant } - if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + ", wireSeqId=" + wireSeqId); + if(log.isDebugEnabled()) log.debug("Debug: PollingManager::pollForSequence, wireAddress=" + wireAddress + " wireSeqId=" + wireSeqId); MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context); if(referenceMessage!=null){ Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java Fri Apr 3 11:35:11 2009 @@ -51,6 +51,7 @@ import org.apache.sandesha2.storage.beans.RMSBean; import org.apache.sandesha2.storage.beans.RMSequenceBean; import org.apache.sandesha2.wsrm.Accept; +import org.apache.sandesha2.wsrm.AckRequested; import org.apache.sandesha2.wsrm.AcksTo; import org.apache.sandesha2.wsrm.CloseSequence; import org.apache.sandesha2.wsrm.CloseSequenceResponse; @@ -141,33 +142,35 @@ // Check if this service includes 2-way operations boolean twoWayService = false; AxisService service = applicationMsgContext.getAxisService(); - if (service != null) { - // if the user has specified this sequence as a one way sequence it should not - // append the sequence offer. - if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty( - SandeshaClientConstants.ONE_WAY_SEQUENCE))) { - Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS); - if (p != null && p.getValue() != null) { - twoWayService = ((Boolean) p.getValue()).booleanValue(); - if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService); - } - } - } + if (service != null) { + // if the user has specified this sequence as a one way sequence it should not + // append the sequence offer. + if (!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty( + SandeshaClientConstants.ONE_WAY_SEQUENCE))) { + Parameter p = service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS); + if (p != null && p.getValue() != null) { + twoWayService = ((Boolean) p.getValue()).booleanValue(); + if (log.isDebugEnabled()) log.debug("RMMsgCreator:: twoWayService " + twoWayService); + } + } + } // Adding sequence offer - if present. We send an offer if the client has assigned an - // id, or if we are using WS-RM 1.0 and the service contains out-in MEPs - boolean autoOffer = false; - if(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmNamespaceValue)) { - autoOffer = twoWayService; - //There may not have been a way to confirm if an OUT_IN MEP is being used. - //Therefore doing an extra check to see what Axis is using. If it's OUT_IN then we must offer. + // id, or if the service contains out-in MEPs + boolean autoOffer = twoWayService; + + //There may not have been a way to confirm if an OUT_IN MEP is being used. + //Therefore doing an extra check to see what Axis is using. If it's OUT_IN then we must offer. + if(applicationMsgContext.getOperationContext() != null && applicationMsgContext.getOperationContext().getAxisOperation() != null){ if(applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_IN - || applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){ + || applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant() == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){ autoOffer = true; } - } else { - // We also do some checking at this point to see if MakeConection is required to - // enable WS-RM 1.1, and write a warning to the log if it has been disabled. + } + + // We also do some checking at this point to see if MakeConection is required to + // enable WS-RM 1.1, and write a warning to the log if it has been disabled. + if(Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) { SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration()); if(twoWayService && !policy.isEnableMakeConnection()) { String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionWarning); @@ -187,26 +190,20 @@ Identifier identifier = new Identifier(rmNamespaceValue); identifier.setIndentifer(offeredSequenceId); offerPart.setIdentifier(identifier); - createSequencePart.setSequenceOffer(offerPart); if (Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) { // We are going to send an offer, so decide which endpoint to include EndpointReference offeredEndpoint = (EndpointReference) applicationMsgContext.getProperty(SandeshaClientConstants.OFFERED_ENDPOINT); + //If the offeredEndpoint hasn't been set then use the acksTo of the RMSBean if (offeredEndpoint==null) { - EndpointReference replyTo = applicationMsgContext.getReplyTo(); //using replyTo as the Endpoint if it is not specified - - if (replyTo!=null) { - offeredEndpoint = SandeshaUtil.cloneEPR(replyTo); - } - } - // Finally fall back to using an anonymous endpoint - if (offeredEndpoint==null) { - //The replyTo has already been set to a MC anon with UUID and so will use that same one for the offered endpoint - offeredEndpoint = rmsBean.getReplyToEndpointReference(); + offeredEndpoint = rmsBean.getAcksToEndpointReference(); } + Endpoint endpoint = new Endpoint (offeredEndpoint, rmNamespaceValue, addressingNamespace); offerPart.setEndpoint(endpoint); } + + createSequencePart.setSequenceOffer(offerPart); } EndpointReference toEPR = rmsBean.getToEndpointReference(); @@ -549,6 +546,45 @@ if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Exit: RMMsgCreator::addAckMessage " + applicationMsg); } + + /** + * Adds an Ack Request for a specific sequence to the given application message. + * + * @param applicationMsg The Message to which the AckRequest will be added + * @param sequenceId - The sequence which we will request the ack for + * @throws SandeshaException + */ + public static void addAckRequest(RMMsgContext applicationMsg, String sequenceId, RMSBean rmsBean) + throws SandeshaException { + if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) + log.debug("Entry: RMMsgCreator::addAckRequest " + sequenceId); + + String rmVersion = rmsBean.getRMVersion(); + String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion); + + AckRequested ackRequest = new AckRequested(rmNamespaceValue); + + Identifier id = new Identifier(rmNamespaceValue); + id.setIndentifer(sequenceId); + ackRequest.setIdentifier(id); + ackRequest.setMustUnderstand(true); + applicationMsg.addAckRequested(ackRequest); + + if (applicationMsg.getWSAAction()==null) { + applicationMsg.setAction(SpecSpecificConstants.getAckRequestAction(rmVersion)); + applicationMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction(rmVersion)); + } + + if(applicationMsg.getMessageId() == null) { + applicationMsg.setMessageId(SandeshaUtil.getUUID()); + } + + // Ensure the message also contains the token that needs to be used + secureOutboundMessage(rmsBean, applicationMsg.getMessageContext()); + + if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) + log.debug("Exit: RMMsgCreator::addAckRequest " + applicationMsg); + } public static RMMsgContext createMakeConnectionMessage (RMMsgContext referenceRMMessage, Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Fri Apr 3 11:35:11 2009 @@ -232,6 +232,17 @@ transaction = storageManager.getTransaction(); + //If this is an application msg we need to add an ackRequest to the header + if(messageType == Sandesha2Constants.MessageTypes.APPLICATION){ + //Add an ackRequest + RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, senderBean.getSequenceID()); + RMMsgCreator.addAckRequest(rmMsgCtx, senderBean.getSequenceID(), rmsBean); + if (transaction != null && transaction.isActive()) + transaction.commit(); + + transaction = storageManager.getTransaction(); + } + //if this is a sync RM exchange protocol we always have to add an ack boolean ackPresent = false; Iterator it = rmMsgCtx.getSequenceAcknowledgements(); @@ -628,21 +639,22 @@ int responseMessageType = responseRMMessage.getMessageType(); if(log.isDebugEnabled()) log.debug("inboundMsgType" + responseMessageType + "outgoing message type " + messageType); - //if this is an application response msg in response to a make connection then we have to take care with the service context - if ((messageType == Sandesha2Constants.MessageTypes.APPLICATION && responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION) - || responseMessageType != Sandesha2Constants.MessageTypes.APPLICATION) { - if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse exchange pattern"); - responseMessageContext.setServiceContext(msgCtx.getServiceContext()); - } - else{ - //Setting the AxisService object + //if this is an application response or createSeqResponse msg in response to a make connection then we have to take care with the service context + if(messageType == Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG + && (responseMessageType == Sandesha2Constants.MessageTypes.APPLICATION + || responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE)){ + + //Setting the AxisService object responseMessageContext.setAxisService(msgCtx.getAxisService()); - //we cannot set service ctx for application response msgs since the srvc ctx will not match the op ctx, causing + //we cannot set service ctx for application response msgs or createSeqResponse msgs since the srvc ctx will not match the op ctx, causing //problems with addressing if(log.isDebugEnabled()) log.debug("NOT setting service ctx for response type " + messageType + ", current srvc ctx =" + responseMessageContext.getServiceContext()); + }else { + if(log.isDebugEnabled()) log.debug("setting service ctx on msg as this is NOT a makeConnection>appResponse or makeConnection>createSeqResponse exchange pattern"); + responseMessageContext.setServiceContext(msgCtx.getServiceContext()); } - + //If addressing is disabled we will be adding this message simply as the application response of the request message. Boolean addressingDisabled = (Boolean) msgCtx.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES); if (addressingDisabled!=null && Boolean.TRUE.equals(addressingDisabled)) { Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java?rev=761627&r1=761626&r2=761627&view=diff ============================================================================== --- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java (original) +++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java Fri Apr 3 11:35:11 2009 @@ -19,6 +19,8 @@ package org.apache.sandesha2.wsrm; +import java.util.Iterator; + import javax.xml.namespace.QName; import org.apache.axiom.om.OMElement; @@ -72,6 +74,15 @@ throw new SandeshaException (message); } + // Sniff the addressing namespace from the Address child of the endpointElement + Iterator children = endpointElement.getChildElements(); + while(children.hasNext() && addressingNamespaceValue == null) { + OMElement child = (OMElement) children.next(); + if("Address".equals(child.getLocalName())) { + addressingNamespaceValue = child.getNamespace().getNamespaceURI(); + } + } + return this; } --------------------------------------------------------------------- To unsubscribe, e-mail: sandesha-dev-unsubscr...@ws.apache.org For additional commands, e-mail: sandesha-dev-h...@ws.apache.org