Author: parsonsd
Date: Fri Apr  3 11:35:11 2009
New Revision: 761627

URL: http://svn.apache.org/viewvc?rev=761627&view=rev
Log:
Fixes made to improve compliance with spec's and interop with other 
implementations:
- AckRequest's now added as piggybacks to application msgs (this is needed as 
implementations don't have to piggyback acks and therefore could only respond 
to ackRequest msgs.)
- We can ignore piggybacked ack requests as Sandesha piggyback acks at every 
opportunity
- Offered EP's can't be set to none so have set to use AcksTo as the offered EP 
if one hasn't been set
- Updated so that we poll for terminateSeqResponse
- Updated so that we offer for 1.1 as well as 1.0.  This fixes an interop issue 
with WCF.

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Fri Apr  3 11:35:11 2009
@@ -47,6 +47,7 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
@@ -98,6 +99,15 @@
 
                //checks weather the ack request was a piggybacked one.
                boolean piggybackedAckRequest = 
!(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK_REQUEST);
+
+               //it is a piggybacked ackrequest so we can ignore as we will 
piggyback acks at every opportunity anyway
+               if(piggybackedAckRequest){
+                       if (log.isDebugEnabled())
+                       log.debug("Exit: 
AckRequestedProcessor::processAckRequestedHeader, it is a piggybacked 
ackrequest for seq " +
+                                       "so we can ignore as we will piggyback 
an ack " + Boolean.FALSE);
+                       //No need to suspend. Just proceed.
+                       return false;
+               }
                
                String sequenceId = 
ackRequested.getIdentifier().getIdentifier();
 
@@ -143,10 +153,7 @@
                
                //creating the ack message. If the ackRequest was a standalone 
this will be a out (response) message 
                MessageContext ackMsgCtx = null;
-//             if (piggybackedAckRequest)
-                       ackMsgCtx = 
SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
-//             else
-//                     ackMsgCtx 
=MessageContextBuilder.createOutMessageContext (msgContext);
+               ackMsgCtx = 
SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
                        
                //setting up the RMMsgContext
                RMMsgContext ackRMMsgCtx = 
MsgInitializer.initializeMessage(ackMsgCtx);
@@ -196,65 +203,14 @@
                        }
 
                } else {
-                       //If AcksTo is non-anonymous we will be adding a 
senderBean entry here. The sender is responsible 
-                       //for sending it out.
-                       
-                       SenderBeanMgr senderBeanMgr = 
storageManager.getSenderBeanMgr();
-
-                       String key = SandeshaUtil.getUUID();
-
-                       // dumping to the storage will be done be Sandesha2 
Transport Sender
-                       // storageManager.storeMessageContext(key,ackMsgCtx);
-
-                       SenderBean ackBean = new SenderBean();
-                       ackBean.setMessageContextRefKey(key);
-                       ackBean.setMessageID(ackMsgCtx.getMessageID());
-                       
-                       //acks are sent only once.
-                       ackBean.setReSend(false);
-                       
-                       ackBean.setSequenceID(sequenceId);
-                       
-                       EndpointReference to = ackMsgCtx.getTo();
-                       if (to!=null)
-                               ackBean.setToAddress(to.getAddress());
-
-                       // this will be set to true in the sender.
-                       ackBean.setSend(true);
-
-                       
ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-                       
ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
                        SandeshaPolicyBean propertyBean = 
SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
 
                        long ackInterval = 
propertyBean.getAcknowledgementInterval();
 
-                       // Ack will be sent as stand alone, only after the 
ackknowledgement interval
+                       // Ack will be sent as stand alone, only after the 
acknowledgement interval
                        long timeToSend = System.currentTimeMillis() + 
ackInterval;
 
-                       // removing old acks.
-                       SenderBean findBean = new SenderBean();
-                       
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-                       Collection<SenderBean> coll = 
senderBeanMgr.find(findBean);
-                       Iterator<SenderBean> it = coll.iterator();
-
-                       if (it.hasNext()) {
-                               SenderBean oldAckBean = (SenderBean) it.next();
-                               // If there is an old Ack. This Ack will be 
sent in the old timeToSend.
-                               timeToSend = oldAckBean.getTimeToSend(); 
-                               senderBeanMgr.delete(oldAckBean.getMessageID());
-                       }
-
-                       ackBean.setTimeToSend(timeToSend);
-
-                       
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-                       
-                       // passing the message through sandesha2sender
-                   SandeshaUtil.executeAndStore(ackRMMsgCtx, key, 
storageManager);
-
-                       // inserting the new Ack.
-                       senderBeanMgr.insert(ackBean);
-
-                       msgContext.pause();                     
+                       AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, 
sequenceId, timeToSend, storageManager);
                }
                
                if (log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 Fri Apr  3 11:35:11 2009
@@ -178,58 +178,66 @@
        
                                // offered seq id
                                String offeredSequenceID = 
offer.getIdentifer().getIdentifier(); 
+
+                               //Need to see if this is a duplicate offer.
+                               //If it is we can't accept the offer as we 
can't be sure it has come from the same client.
+                               RMSBean finderBean = new RMSBean ();
+                               finderBean.setSequenceID(offeredSequenceID);
+                               RMSBean rMSBean = 
storageManager.getRMSBeanMgr().findUnique(finderBean);
+                               boolean offerAccepted = false;
+                               String outgoingSideInternalSequenceId = 
SandeshaUtil
+                                       
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
+                               if(rMSBean != null){
+                                       if (log.isDebugEnabled())
+                                               log.debug("Duplicate offer so 
we can't accept as we can't be sure it's from the same client: " + 
offeredSequenceID);
+                                       offerAccepted = false;
+                               } else {
+                                       boolean isValidseqID = 
isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
+                                       offerAccepted = true;
                                
-                               boolean isValidseqID = 
isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
-                               boolean offerAccepted = true;
-                               
-                               RMSBean rMSBean = null;
-                               //Before processing this offer any further we 
need to perform some extra checks 
-                               //on the offered EP if WS-RM Spec 1.1 is being 
used
-                               if(isValidseqID && 
Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
-                                       Endpoint endpoint = offer.getEndpoint();
-                                       if (endpoint!=null) {
-                                               //Check to see if the offer 
endpoint has a value of WSA Anonymous
-                                               String addressingNamespace = 
(String) createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
-                                               String endpointAddress = 
endpoint.getEPR().getAddress();
-                                               
if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
-                                                       //We will still accept 
this offer but we should warn the user that this MEP is not always reliable or 
efficient
-                                                       if 
(log.isDebugEnabled())
-                                                               log.debug("CSeq 
msg contains offer with an anonymous EPR");     
-                                                       
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning,
 createSeqRMMsg.getMessageContext().getMessageID(), 
-                                                                       
offeredSequenceID));
-                                               } 
-                                               
-                                               rMSBean = new RMSBean();
-                                               //Set the offered EP
-                                               
rMSBean.setOfferedEndPoint(endpointAddress);
+                                       //Before processing this offer any 
further we need to perform some extra checks 
+                                       //on the offered EP if WS-RM Spec 1.1 
is being used
+                                       if(isValidseqID && 
Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
+                                               Endpoint endpoint = 
offer.getEndpoint();
+                                               if (endpoint!=null) {
+                                                       //Check to see if the 
offer endpoint has a value of WSA Anonymous
+                                                       String 
addressingNamespace = (String) 
createSeqRMMsg.getProperty(AddressingConstants.WS_ADDRESSING_VERSION);
+                                                       String endpointAddress 
= endpoint.getEPR().getAddress();
+                                                       
if(SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace).equals(endpointAddress)){
+                                                               //We will still 
accept this offer but we should warn the user that this MEP is not always 
reliable or efficient
+                                                               if 
(log.isDebugEnabled())
+                                                                       
log.debug("CSeq msg contains offer with an anonymous EPR");     
+                                                               
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceMEPWarning,
 createSeqRMMsg.getMessageContext().getMessageID(), 
+                                                                               
offeredSequenceID));
+                                                       }
+                                                       rMSBean = new 
RMSBean();                                                        //Set the 
offered EP
+                                                       
rMSBean.setOfferedEndPoint(endpointAddress);
                                                
-                                       } else {
-                                               //Don't accept the offer
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Offer 
Refused as it included a null endpoint");      
-                                               offerAccepted = false;
+                                               } else {
+                                                       //Don't accept the offer
+                                                       if 
(log.isDebugEnabled())
+                                                               
log.debug("Offer Refused as it included a null endpoint");      
+                                                       offerAccepted = false;
+                                               }
+                                       } else if (isValidseqID && 
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
+                                               rMSBean = new RMSBean(); 
                                        }
-                               } else if (isValidseqID && 
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
-                                       rMSBean = new RMSBean(); 
-                               }
-                               
-                               String outgoingSideInternalSequenceId = 
SandeshaUtil
-                               
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
-                               
-                               if(isValidseqID){
-                                       // Setting the CreateSequence table 
entry for the outgoing
-                                       // side.
-                                       
rMSBean.setSequenceID(offeredSequenceID);       
-                                       
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
-                                       // this is a dummy value
-                                       
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
-                                       
-                                       //Try inserting the new RMSBean
-                                       
if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
-                                               offerAccepted = false;
+                                       if(isValidseqID){
+                                               // Setting the CreateSequence 
table entry for the outgoing
+                                               // side.
+                                               
rMSBean.setSequenceID(offeredSequenceID);       
+                                               
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+                                               // this is a dummy value
+                                               
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID()); 
+                                       
+                                               //Try inserting the new RMSBean
+                                               
if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
+                                                       offerAccepted = false;
+                                               }
                                        }
                                }
-                               
+
                                if (offerAccepted) {
                                        if(rmdBean.getToEndpointReference() != 
null){
                                                
rMSBean.setToEndpointReference(rmdBean.getToEndpointReference());

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Fri Apr  3 11:35:11 2009
@@ -199,6 +199,7 @@
                                if 
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
 {
                                        if(rmsBean.isPollingMode()) {
                                                rMDBean.setPollingMode(true);
+                                               
rMDBean.setReplyToEndpointReference(rmsBean.getReplyToEndpointReference());
                                        }
                                }
                                

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 Fri Apr  3 11:35:11 2009
@@ -63,9 +63,10 @@
                
msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
 
                //shedulling a polling request for the response side.
-               if (rmsBean.getOfferedSequence()!=null) {
+               String offeredSeq = rmsBean.getOfferedSequence();
+               if (offeredSeq!=null) {
                        RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
-                       RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
+                       RMDBean rMDBean = rMDBeanMgr.retrieve(offeredSeq);
                        
                        if (rMDBean!=null && rMDBean.isPollingMode()) {
                                PollingManager manager = 
storageManager.getPollingManager();

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
 Fri Apr  3 11:35:11 2009
@@ -153,17 +153,31 @@
                } else {
                        if (log.isDebugEnabled())
                                log.debug("Polling rms " + beanToPoll);
+                       
                        // The sequence is there, but we still only poll if we 
are expecting reply messages,
-                       // or if we don't have clean ack state. (We assume acks 
are clean, and only unset
+                       // if we don't have clean ack state or we are waiting 
for a terminateSeqResponse. (We assume acks are clean, and only unset
                        // this if we find evidence to the contrary).
                        boolean cleanAcks = true;
-                       if (beanToPoll.getNextMessageNumber() > -1)
+                       boolean waitingForTerminateSeqResponse = false;
+                       long repliesExpected = 0;
+                       
+                       if (!force && beanToPoll.getNextMessageNumber() > -1) {
                                cleanAcks = 
AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(),
 beanToPoll.getNextMessageNumber());
-                       long  repliesExpected = beanToPoll.getExpectedReplies();
+                               if(cleanAcks){
+                                       repliesExpected = 
beanToPoll.getExpectedReplies();
+                                       if(repliesExpected == 0){
+                                               //Need to check if we are 
waiting for a terminateSeqResponse
+                                               if(beanToPoll.isTerminated() == 
false && beanToPoll.isTerminateAdded() == true){
+                                                       
waitingForTerminateSeqResponse = true;
+                                               }
+                                       }
+                               }
+                       }
+
                        if(beanToPoll.getSequenceID() != null){
-                               if((force || !cleanAcks || repliesExpected > 0) 
&& beanToPoll.getReferenceMessageStoreKey() != null){
+                               if((force || !cleanAcks || repliesExpected > 0 
|| waitingForTerminateSeqResponse) && beanToPoll.getReferenceMessageStoreKey() 
!= null){
                                        
pollForSequence(beanToPoll.getAnonymousUUID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll, entry);
-                               }
+                               } 
                        } else {
                                //If seqID is null on RMS bean then it must be 
an RMSBean waiting for a createSeqResponse and we want to poll for these
                        pollForSequence(beanToPoll.getAnonymousUUID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll, entry);
@@ -233,7 +247,7 @@
                        wireSeqId = rmBean.getSequenceID(); //this case could 
make us non-RSP compliant
                }
                
-               if(log.isDebugEnabled()) log.debug("Debug: 
PollingManager::pollForSequence, wireAddress=" + wireAddress + ", wireSeqId=" + 
wireSeqId);
+               if(log.isDebugEnabled()) log.debug("Debug: 
PollingManager::pollForSequence, wireAddress=" + wireAddress + " wireSeqId=" + 
wireSeqId);
                
                MessageContext referenceMessage = 
storageManager.retrieveMessageContext(referenceMsgKey,context);
                if(referenceMessage!=null){

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/RMMsgCreator.java
 Fri Apr  3 11:35:11 2009
@@ -51,6 +51,7 @@
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.wsrm.Accept;
+import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.AcksTo;
 import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.CloseSequenceResponse;
@@ -141,33 +142,35 @@
                // Check if this service includes 2-way operations
                boolean twoWayService = false;
                AxisService service = applicationMsgContext.getAxisService();
-        if (service != null) {
-            // if the user has specified this sequence as a one way sequence 
it should not
-            // append the sequence offer.
-            if 
(!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
-                    SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
-                Parameter p = 
service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
-                if (p != null && p.getValue() != null) {
-                    twoWayService = ((Boolean) p.getValue()).booleanValue();
-                    if (log.isDebugEnabled()) log.debug("RMMsgCreator:: 
twoWayService " + twoWayService);
-                }
-            }
-        }
+               if (service != null) {
+                       // if the user has specified this sequence as a one way 
sequence it should not
+                       // append the sequence offer.
+                       if 
(!JavaUtils.isTrue(applicationMsgContext.getOptions().getProperty(
+                               SandeshaClientConstants.ONE_WAY_SEQUENCE))) {
+                               Parameter p = 
service.getParameter(Sandesha2Constants.SERVICE_CONTAINS_OUT_IN_MEPS);
+                               if (p != null && p.getValue() != null) {
+                                       twoWayService = ((Boolean) 
p.getValue()).booleanValue();
+                                       if (log.isDebugEnabled()) 
log.debug("RMMsgCreator:: twoWayService " + twoWayService);
+                               }
+                       }
+               }
 
                // Adding sequence offer - if present. We send an offer if the 
client has assigned an
-               // id, or if we are using WS-RM 1.0 and the service contains 
out-in MEPs
-               boolean autoOffer = false;
-               
if(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmNamespaceValue)) {
-                       autoOffer = twoWayService;
-                        //There may not have been a way to confirm if an 
OUT_IN MEP is being used.
-                       //Therefore doing an extra check to see what Axis is 
using.  If it's OUT_IN then we must offer.
+               // id, or if the service contains out-in MEPs
+               boolean autoOffer = twoWayService;
+
+               //There may not have been a way to confirm if an OUT_IN MEP is 
being used.
+               //Therefore doing an extra check to see what Axis is using.  If 
it's OUT_IN then we must offer.
+               if(applicationMsgContext.getOperationContext() != null && 
applicationMsgContext.getOperationContext().getAxisOperation() != null){
                        
if(applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant()
 == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_IN
-                            || 
applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant()
 == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
+                               || 
applicationMsgContext.getOperationContext().getAxisOperation().getAxisSpecificMEPConstant()
 == org.apache.axis2.wsdl.WSDLConstants.MEP_CONSTANT_OUT_OPTIONAL_IN){
                                autoOffer = true;
                        }
-               } else {
-                       // We also do some checking at this point to see if 
MakeConection is required to
-                       // enable WS-RM 1.1, and write a warning to the log if 
it has been disabled.
+               }
+
+               // We also do some checking at this point to see if 
MakeConection is required to
+               // enable WS-RM 1.1, and write a warning to the log if it has 
been disabled.
+               
if(Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
                        SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
                        if(twoWayService && !policy.isEnableMakeConnection()) {
                                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionWarning);
@@ -187,26 +190,20 @@
                        Identifier identifier = new 
Identifier(rmNamespaceValue);
                        identifier.setIndentifer(offeredSequenceId);
                        offerPart.setIdentifier(identifier);
-                       createSequencePart.setSequenceOffer(offerPart);
                        
                        if 
(Sandesha2Constants.SPEC_2007_02.NS_URI.equals(rmNamespaceValue)) {
                                // We are going to send an offer, so decide 
which endpoint to include
                                EndpointReference offeredEndpoint = 
(EndpointReference) 
applicationMsgContext.getProperty(SandeshaClientConstants.OFFERED_ENDPOINT);
+                               //If the offeredEndpoint hasn't been set then 
use the acksTo of the RMSBean
                                if (offeredEndpoint==null) {
-                                       EndpointReference replyTo = 
applicationMsgContext.getReplyTo();  //using replyTo as the Endpoint if it is 
not specified
-                               
-                                       if (replyTo!=null) {
-                                               offeredEndpoint = 
SandeshaUtil.cloneEPR(replyTo);
-                                       }
-                               }
-                               // Finally fall back to using an anonymous 
endpoint
-                               if (offeredEndpoint==null) {
-                                       //The replyTo has already been set to a 
MC anon with UUID and so will use that same one for the offered endpoint  
-                                       offeredEndpoint = 
rmsBean.getReplyToEndpointReference();
+                                       offeredEndpoint = 
rmsBean.getAcksToEndpointReference();
                                }
+                               
                                Endpoint endpoint = new Endpoint 
(offeredEndpoint, rmNamespaceValue, addressingNamespace);
                                offerPart.setEndpoint(endpoint);
                        }
+                       
+                       createSequencePart.setSequenceOffer(offerPart);
                }
 
                EndpointReference toEPR = rmsBean.getToEndpointReference();
@@ -549,6 +546,45 @@
                if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) 
                        log.debug("Exit: RMMsgCreator::addAckMessage " + 
applicationMsg);
        }
+
+       /**
+        * Adds an Ack Request for a specific sequence to the given application 
message.
+        * 
+        * @param applicationMsg The Message to which the AckRequest will be 
added
+        * @param sequenceId - The sequence which we will request the ack for
+        * @throws SandeshaException
+        */
+       public static void addAckRequest(RMMsgContext applicationMsg, String 
sequenceId, RMSBean rmsBean)
+                       throws SandeshaException {
+               if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+                       log.debug("Entry: RMMsgCreator::addAckRequest " + 
sequenceId);
+               
+               String rmVersion = rmsBean.getRMVersion();
+               String rmNamespaceValue = 
SpecSpecificConstants.getRMNamespaceValue(rmVersion);
+               
+               AckRequested ackRequest = new AckRequested(rmNamespaceValue);   
+               
+               Identifier id = new Identifier(rmNamespaceValue);
+               id.setIndentifer(sequenceId);
+               ackRequest.setIdentifier(id);
+               ackRequest.setMustUnderstand(true);
+               applicationMsg.addAckRequested(ackRequest);
+
+               if (applicationMsg.getWSAAction()==null) {
+                       
applicationMsg.setAction(SpecSpecificConstants.getAckRequestAction(rmVersion));
+                       
applicationMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction(rmVersion));
+               }
+               
+               if(applicationMsg.getMessageId() == null) {
+                       applicationMsg.setMessageId(SandeshaUtil.getUUID());
+               }
+                               
+               // Ensure the message also contains the token that needs to be 
used
+               secureOutboundMessage(rmsBean, 
applicationMsg.getMessageContext());
+                       
+               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) 
+                       log.debug("Exit: RMMsgCreator::addAckRequest " + 
applicationMsg);
+       }
        
        
        public static RMMsgContext createMakeConnectionMessage (RMMsgContext 
referenceRMMessage,

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 Fri Apr  3 11:35:11 2009
@@ -232,6 +232,17 @@
                        
                        transaction = storageManager.getTransaction();
 
+                       //If this is an application msg we need to add an 
ackRequest to the header              
+                       if(messageType == 
Sandesha2Constants.MessageTypes.APPLICATION){                         
+                               //Add an ackRequest                             
+                               RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, 
senderBean.getSequenceID());                            
+                               RMMsgCreator.addAckRequest(rmMsgCtx, 
senderBean.getSequenceID(), rmsBean);                                           
                   
+                               if (transaction != null && 
transaction.isActive())                                      
+                                       transaction.commit();
+                               
+                               transaction = storageManager.getTransaction();  
                
+                       }                                               
+                       
                        //if this is a sync RM exchange protocol we always have 
to add an ack
                        boolean ackPresent = false;
                        Iterator it = rmMsgCtx.getSequenceAcknowledgements();
@@ -628,21 +639,22 @@
                                int responseMessageType = 
responseRMMessage.getMessageType();
                                if(log.isDebugEnabled()) 
log.debug("inboundMsgType" + responseMessageType + "outgoing message type " + 
messageType);
                                                                
-                               //if this is an application response msg in 
response to a make connection then we have to take care with the service context
-                               if ((messageType == 
Sandesha2Constants.MessageTypes.APPLICATION && responseMessageType == 
Sandesha2Constants.MessageTypes.APPLICATION)
-                                       || responseMessageType != 
Sandesha2Constants.MessageTypes.APPLICATION) {
-                                       if(log.isDebugEnabled()) 
log.debug("setting service ctx on msg as this is NOT a 
makeConnection>appResponse exchange pattern");
-                                       
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
-                               }
-                               else{
-                                        //Setting the AxisService object
+                               //if this is an application response or 
createSeqResponse msg in response to a make connection then we have to take 
care with the service context
+                               if(messageType == 
Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG 
+                                               && (responseMessageType == 
Sandesha2Constants.MessageTypes.APPLICATION 
+                                                               || 
responseMessageType == Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE)){
+                               
+                                       //Setting the AxisService object
                                        
responseMessageContext.setAxisService(msgCtx.getAxisService());
 
-                                       //we cannot set service ctx for 
application response msgs since the srvc ctx will not match the op ctx, causing
+                                       //we cannot set service ctx for 
application response msgs or createSeqResponse msgs since the srvc ctx will not 
match the op ctx, causing
                                        //problems with addressing
                                        if(log.isDebugEnabled()) log.debug("NOT 
setting service ctx for response type " + messageType + ", current srvc ctx =" 
+ responseMessageContext.getServiceContext());
+                               }else {
+                                       if(log.isDebugEnabled()) 
log.debug("setting service ctx on msg as this is NOT a 
makeConnection>appResponse or makeConnection>createSeqResponse exchange 
pattern");
+                                       
responseMessageContext.setServiceContext(msgCtx.getServiceContext());
                                }
-                               
+       
                                //If addressing is disabled we will be adding 
this message simply as the application response of the request message.
                                Boolean addressingDisabled = (Boolean) 
msgCtx.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES);
                                if (addressingDisabled!=null && 
Boolean.TRUE.equals(addressingDisabled)) {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java?rev=761627&r1=761626&r2=761627&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/wsrm/Endpoint.java
 Fri Apr  3 11:35:11 2009
@@ -19,6 +19,8 @@
 
 package org.apache.sandesha2.wsrm;
 
+import java.util.Iterator;
+
 import javax.xml.namespace.QName;
 
 import org.apache.axiom.om.OMElement;
@@ -72,6 +74,15 @@
                        throw new SandeshaException (message);
                }
                
+               // Sniff the addressing namespace from the Address child of the 
endpointElement
+               Iterator children = endpointElement.getChildElements();
+               while(children.hasNext() && addressingNamespaceValue == null) {
+                       OMElement child = (OMElement) children.next();
+                       if("Address".equals(child.getLocalName())) {
+                               addressingNamespaceValue = 
child.getNamespace().getNamespaceURI();
+                       }
+               }
+               
                return this;
        }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscr...@ws.apache.org
For additional commands, e-mail: sandesha-dev-h...@ws.apache.org

Reply via email to