Author: mlovett
Date: Wed Feb 28 07:21:17 2007
New Revision: 512799

URL: http://svn.apache.org/viewvc?view=rev&rev=512799
Log:
Rework acks so that we piggyback based on the To EPR, and avoid storing ack 
messages when we know that we can piggyback later

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 Wed Feb 28 07:21:17 2007
@@ -18,8 +18,6 @@
 package org.apache.sandesha2.handlers;
 
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisService;
@@ -29,23 +27,17 @@
 import org.apache.sandesha2.MessageValidator;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
 import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
 import org.apache.sandesha2.msgprocessors.MessagePendingProcessor;
 import org.apache.sandesha2.msgprocessors.SequenceProcessor;
-import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.wsrm.Sequence;
 
 /**
  * This is invoked in the inFlow of an RM endpoint. This is responsible for
@@ -160,90 +152,4 @@
                return returnValue;
        }
        
-       
-       public void flowComplete(MessageContext msgContext) {
-               super.flowComplete(msgContext);
-               
-               Transaction transaction = null;
-               try {
-                       //if in order is not enabled and server side and this 
is an application message
-                       
-                       //check the replyTo address
-                       //check the AcksTo address of the incoming sequence
-
-//                     if (replyTo is anonymous and this is not an InOnly 
message)
-//                             add an HOLD response property
-//                             SUSPEND the execution
-//                             Sender will attach a sync response using the 
RequestResponseTransport object.
-//                     else  (if acksTo is anonymous AND no response message 
has been added)
-//                             send an ack to the back channel now.
-                       
-                       ConfigurationContext configurationContext = 
msgContext.getConfigurationContext();
-                       StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext, 
-                                                                               
                                                                                
   configurationContext.getAxisConfiguration());
-                       
-                       transaction = storageManager.getTransaction();
-                       
-                       RMMsgContext rmMsgContext = 
MsgInitializer.initializeMessage(msgContext);
-                       
-                       SandeshaPolicyBean policyBean = 
SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
-
-                       boolean inOrder= policyBean.isInOrder();
-                       
-                       if (msgContext.isServerSide() && !inOrder && 
rmMsgContext.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) {
-                               
-                               Sequence sequence = (Sequence) 
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                               String sequenceId = 
sequence.getIdentifier().getIdentifier();
-                               
-                               RMDBeanMgr rmdBeanMgr = 
storageManager.getRMDBeanMgr();
-                               
-                               RMDBean findBean = new RMDBean ();
-                               findBean.setSequenceID(sequenceId);
-                               RMDBean rmdBean = 
rmdBeanMgr.findUnique(findBean);
-
-                               if (rmdBean==null) {
-                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rmdBeanNotFound,sequenceId);
-                                       throw new SandeshaException (message);
-                               }
-                               
-                               String acksToAddress = rmdBean.getAcksToEPR();
-                               
-                               EndpointReference acksTo = new 
EndpointReference (acksToAddress);
-                               
-                               if (acksTo!=null && 
acksTo.hasAnonymousAddress()) {
-                                       
-                                       Object responseWritten = 
msgContext.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
-                                       if (responseWritten==null || 
!Constants.VALUE_TRUE.equals(responseWritten)) {
-                                               RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgContext, rmdBean, sequenceId, 
storageManager, true);
-                                               
msgContext.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
-                                               
AcknowledgementManager.sendAckNow(ackRMMsgContext);
-                                       }
-                                       
-                               }
-                       }
-               } catch (AxisFault e) {
-                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.exceptionInFlowCompletion);
-                       log.error(message, e);
-                       
-                       if (transaction != null) {
-                               try {
-                                       transaction.rollback();
-                                       transaction = null;
-                               } catch (Exception e1) {
-                                       message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, 
e1.toString());
-                                       log.debug(message, e);
-                               }
-                       }
-               } finally {
-                       if (transaction != null) {
-                               try {
-                                       transaction.commit();
-                               } catch (Exception e) {
-                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
-                                       log.debug(message, e);
-                               }
-                       }
-               }
-       }
-
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Wed Feb 28 07:21:17 2007
@@ -146,7 +146,6 @@
        public static final String 
cannotFindAddressElement="cannotFindAddressElement";
        public static final String 
cannotFindAddressText="cannotFindAddressText";
        public static final String nullPassedElement="nullPassedElement";
-       public static final String 
invalidAckMessageEntry="invalidAckMessageEntry";
        public static final String seqPartIsNull="seqPartIsNull";
        public static final String 
incomingSequenceNotValidID="incomingSequenceNotValidID";
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 Wed Feb 28 07:21:17 2007
@@ -157,7 +157,6 @@
 cannotFindAddressElement=Cannot find an ''Address'' part in the given element 
{0}.
 cannotFindAddressText=The passed element {0} does not have a valid address 
text.
 nullPassedElement=The passed element is null.
-invalidAckMessageEntry=Invalid ack message entry: {0}.
 seqPartIsNull=Sequence part is null.
 incomingSequenceNotValidID=The ID for the incoming sequence is not valid: 
''{0}''.
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Wed Feb 28 07:21:17 2007
@@ -174,7 +174,7 @@
 
                ackMsgCtx.setTo(acksTo);
                ackMsgCtx.setReplyTo(msgContext.getTo());
-               RMMsgCreator.addAckMessage(ackRMMsgCtx,rmdBean, sequenceId, 
rmdBean);
+               RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
                ackRMMsgCtx.getMessageContext().setServerSide(true);
 
                if (acksTo.hasAnonymousAddress()) {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 Wed Feb 28 07:21:17 2007
@@ -20,8 +20,6 @@
 import java.util.Iterator;
 
 import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.ContextFactory;
@@ -43,7 +41,6 @@
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.WSRMMessageSender;
@@ -102,24 +99,7 @@
                storageManager.getRMDBeanMgr().update(rmdBean);
 
                RMMsgContext ackRMMsgCtx = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, rmdBean, sequenceId, 
storageManager, true);
-
-               MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
-               String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
-               ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
-
-               SOAPFactory factory = 
SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-                               .getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
-
-               // Setting new envelope
-               SOAPEnvelope envelope = factory.getDefaultEnvelope();
-               try {
-                       ackMsgCtx.setEnvelope(envelope);
-               } catch (AxisFault e3) {
-                       throw new SandeshaException(e3.getMessage());
-               }
-
-               // adding the ack part to the envelope.
+               // adding the ack part(s) to the envelope.
                Iterator sequenceAckIter = ackRMMsgCtx
                                
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Wed Feb 28 07:21:17 2007
@@ -303,13 +303,9 @@
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new
 Long (msgNo));
                
-//             adding of acks
-               
-//             if acksTo anonymous 
-//                     add an ack entry with an infinite ack interval so that 
it will be piggybacked by any possible response message
-//             else 
-//                     add an ack entry here
-               
+               // We only create an ack message if:
+               // - We have anonymous acks, and the backchannel is free
+               // - We have async acks
                boolean backchannelFree = (replyTo != null && 
!replyTo.hasAnonymousAddress()) ||
                                                                        
WSDLConstants.MEP_CONSTANT_IN_ONLY == mep;
                EndpointReference acksTo = new EndpointReference 
(bean.getAcksToEPR());
@@ -320,18 +316,13 @@
                                
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
                                
AcknowledgementManager.sendAckNow(ackRMMsgContext);
                        }
-               } else { //Scenario 2 and Scenario 3
-                       //              having a negative value for timeToSend 
will make this behave as having an infinite ack interval.
-                       long timeToSend = -1;   
-                       if (!acksTo.hasAnonymousAddress()) {
-                               SandeshaPolicyBean policyBean = 
SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
-                               long ackInterval = 
policyBean.getAcknowledgementInterval();
-                               timeToSend = System.currentTimeMillis() + 
ackInterval;
-                       }
+               } else if (!acksTo.hasAnonymousAddress()) {
+                       SandeshaPolicyBean policyBean = 
SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
+                       long ackInterval = 
policyBean.getAcknowledgementInterval();
+                       long timeToSend = System.currentTimeMillis() + 
ackInterval;
                        
                        RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, 
storageManager,true);
 
-                       AcknowledgementManager.removeAckBeanEntries(sequenceId, 
storageManager);
                        AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, 
sequenceId, timeToSend, storageManager);
                }
                

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Wed Feb 28 07:21:17 2007
@@ -315,18 +315,11 @@
                RMMsgContext ackRMMessage = 
AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, rmdBean, 
                                sequenceId,     storageManager, true);
                
+               // copy over the ack parts
                Iterator iter = 
ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-               
-               if (iter.hasNext()) {
+               while (iter.hasNext()) {
                        SequenceAcknowledgement seqAck = 
(SequenceAcknowledgement) iter.next();
-                       if (seqAck==null) {
-                               String message = "No SequenceAcknowledgement 
part is present";
-                               throw new SandeshaException (message);
-                       }
-               
                        
terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 seqAck);
-               } else {
-                       //TODO 
                }
                
                terminateSeqResponseRMMsg.addSOAPEnvelope();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
 Wed Feb 28 07:21:17 2007
@@ -19,14 +19,9 @@
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 
-import javax.xml.namespace.QName;
-
-import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
@@ -46,10 +41,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 
 /**
  * Contains logic for managing acknowledgements.
@@ -71,29 +63,31 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: 
AcknowledgementManager::piggybackAcksIfPresent");
                
-               ConfigurationContext configurationContext = 
rmMessageContext.getConfigurationContext();
                SenderBeanMgr retransmitterBeanMgr = 
storageManager.getSenderBeanMgr();
 
-               // If this message is going to an anonymous address then we add 
in an ack for the
-               // sequence that was used on the inbound side.
+               // If this message is going to an anonymous address, and the 
inbound sequence has
+               // anonymous acksTo, then we add in an ack for the inbound 
sequence.
                EndpointReference target = rmMessageContext.getTo();
                if(target.hasAnonymousAddress()) {
-                       Sequence sequence = (Sequence) 
rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                       if(sequence != null) {
-                               String outboundSequenceId = 
sequence.getIdentifier().getIdentifier();
-                               RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outboundSequenceId);
-                               String outboundInternalSeq = 
rmsBean.getInternalSequenceID();
-                               String inboundSequenceId = 
SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(outboundInternalSeq);
-                               RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequenceId);
-                               if (rmdBean != null) {
-                                       if(log.isDebugEnabled()) 
log.debug("Piggybacking ack for " + inboundSequenceId);
-                                       
RMMsgCreator.addAckMessage(rmMessageContext, rmsBean, inboundSequenceId, 
rmdBean);
+                       String inboundSequence = (String) 
rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+                       if(inboundSequence != null) {
+                               RMDBean inboundBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
+                               if(inboundBean != null) {
+                                       String acksTo = 
inboundBean.getAcksToEPR();
+                                       EndpointReference acksToEPR = new 
EndpointReference(acksTo);
+                                       
+                                       if(acksTo == null || 
acksToEPR.hasAnonymousAddress()) {
+                                               if(log.isDebugEnabled()) 
log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+                                               
RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean);
+                                       }
                                }
                        }
                        if(log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, anon");
                        return;
                }
                
+               // From here on, we must be dealing with a real address. 
Piggyback all sequences that have an
+               // acksTo that matches the To address, and that have an 
ackMessage queued up for sending.
                SenderBean findBean = new SenderBean();
                findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
                findBean.setSend(true);
@@ -101,67 +95,26 @@
 
                Collection collection = retransmitterBeanMgr.find(findBean);
                Iterator it = collection.iterator();
-
                while (it.hasNext()) {
                        SenderBean ackBean = (SenderBean) it.next();
 
+                       // Piggybacking will happen only if the end of ack 
interval (timeToSend) is not reached.
                        long timeNow = System.currentTimeMillis();
                        if (ackBean.getTimeToSend() > timeNow) {
-                               // //Piggybacking will happen only if the end 
of ack interval
-                               // (timeToSend) is not reached.
-
-                               MessageContext ackMsgContext = 
storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
-                                               configurationContext);
-
-                               if (log.isDebugEnabled()) log.debug("Adding ack 
headers");
-
-                               // deleting the ack entry.
+                               // Delete the beans that would have sent the ack
                                
retransmitterBeanMgr.delete(ackBean.getMessageID());
+                               
storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
 
-                               // Adding the ack(s) to the application message
-                               boolean acks = false;
-                               MessageContext messageContext = 
rmMessageContext.getMessageContext();
-                               SOAPHeader appMsgHeaders = 
messageContext.getEnvelope().getHeader();
-                               
-                               // If the App message doesn't have a SOAP 
Header, create one here.
-                               if (appMsgHeaders == null) {
-                                       SOAPFactory factory = (SOAPFactory) 
messageContext.getEnvelope().getOMFactory();
-                                       appMsgHeaders = 
factory.createSOAPHeader(messageContext.getEnvelope());
-                               }
-                                                                       
-                               SOAPHeader headers = 
ackMsgContext.getEnvelope().getHeader();
-                               if(headers != null) {
-                                       for(int i = 0; i < 
Sandesha2Constants.SPEC_NS_URIS.length; i++) {
-
-                                               QName name = new 
QName(Sandesha2Constants.SPEC_NS_URIS[i], 
Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
-                                               Iterator iter = 
headers.getChildrenWithName(name);
-                                               while(iter.hasNext()) {
-                                                       OMElement ackElement = 
(OMElement) iter.next();
-
-                                                       SequenceAcknowledgement 
sequenceAcknowledgement = new SequenceAcknowledgement 
(Sandesha2Constants.SPEC_NS_URIS[i]);
-                                                       
sequenceAcknowledgement.fromOMElement(ackElement);
-                                                       
-                                                       
sequenceAcknowledgement.toOMElement(appMsgHeaders);
-                                                       acks = true;
-                                                       
-                                                       // Make sure that the 
outbound message is secured with the token that
-                                                       // matches the ack.
-                                                       String seqId = 
sequenceAcknowledgement.getIdentifier().getIdentifier();
-                                                       RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, seqId);
-                                                       
RMMsgCreator.secureOutboundMessage(rmdBean, messageContext);
-                                               }
-                                       }
-                               }
-                               
-                               if (!acks) {
-                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry,
-                                                       
ackMsgContext.getEnvelope().toString());
-                                       log.debug(message);
-                                       throw new SandeshaException(message);
+                               String sequenceId = ackBean.getSequenceID();
+                               if (log.isDebugEnabled()) 
log.debug("Piggybacking ack for sequence: " + sequenceId);
+
+                               RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+                               if(rmdBean != null) {
+                                       
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
                                }
                        }
                }
-
+               
                if (log.isDebugEnabled())
                        log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent");
        }
@@ -224,7 +177,7 @@
                ackMsgCtx.setServerSide(serverSide);
 
                // adding the SequenceAcknowledgement part.
-               RMMsgCreator.addAckMessage(ackRMMsgCtx, rmdBean ,sequenceId, 
rmdBean);
+               RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
AcknowledgementManager::generateAckMessage");
@@ -245,37 +198,19 @@
                }
 
                if (log.isDebugEnabled())
-                       log.debug("Enter: 
AcknowledgementManager::verifySequenceCompletion " + result);
+                       log.debug("Exit: 
AcknowledgementManager::verifySequenceCompletion " + result);
                return result;
        }
        
-       public static void addFinalAcknowledgement () {
-               
-       }
-       
-       public static void removeAckBeanEntries (String sequenceId, 
StorageManager storageManager) throws SandeshaException {
-               SenderBean findBean = new SenderBean ();
-               findBean.setSequenceID(sequenceId);
-               findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-               
-               SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-               List senderBeans = senderBeanMgr.find(findBean);
-               
-               for (Iterator it = senderBeans.iterator();it.hasNext();) {
-                       SenderBean bean = (SenderBean) it.next();
-                       senderBeanMgr.delete(bean.getMessageID());
-               }
-               
-       }
-       
        public static void addAckBeanEntry (
                        RMMsgContext ackRMMsgContext,
                        String sequenceId, 
                        long timeToSend,
                        StorageManager storageManager) throws AxisFault {
+               if(log.isDebugEnabled()) log.debug("Enter: 
AcknowledgementManager::addAckBeanEntry");
 
-               // / Transaction asyncAckTransaction =
-               // storageManager.getTransaction();
+               // Write the acks into the envelope
+               ackRMMsgContext.addSOAPEnvelope();
                
                MessageContext ackMsgContext = 
ackRMMsgContext.getMessageContext();
 
@@ -297,28 +232,19 @@
 
                ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
 
-
-               // Ack will be sent as stand alone, only after the retransmitter
-               // interval.
-//             long timeToSend = System.currentTimeMillis() + ackInterval;
-
                // removing old acks.
                SenderBean findBean = new SenderBean();
                findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
-               // this will be set to true in the sandesha2TransportSender.
                findBean.setSend(true);
                findBean.setReSend(false);
+               findBean.setSequenceID(sequenceId);
                Collection coll = retransmitterBeanMgr.find(findBean);
                Iterator it = coll.iterator();
 
-               if (it.hasNext()) {
+               while(it.hasNext()) {
                        SenderBean oldAckBean = (SenderBean) it.next();
-                       timeToSend = oldAckBean.getTimeToSend(); // If there is 
an
-                                                                               
                                // old ack. This ack
-                                                                               
                                // will be sent in
-                                                                               
                                // the old
-                                                                               
                                // timeToSend.
+                       if(oldAckBean.getTimeToSend() < timeToSend)
+                               timeToSend = oldAckBean.getTimeToSend();
 
                        // removing the retransmitted entry for the oldAck
                        retransmitterBeanMgr.delete(oldAckBean.getMessageID());
@@ -331,8 +257,6 @@
 
                
ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
                
-               //asyncAckTransaction.commit();
-
                // passing the message through sandesha2sender
                ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, 
Sandesha2Constants.VALUE_TRUE);
                
@@ -341,12 +265,16 @@
                // inserting the new ack.
                retransmitterBeanMgr.insert(ackBean);
 
+               if(log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::addAckBeanEntry");
        }
        
        public static void sendAckNow (RMMsgContext ackRMMsgContext) throws 
AxisFault {
                if (log.isDebugEnabled())
                        log.debug("Enter: AcknowledgementManager::sendAckNow");
 
+               // Write the acks into the envelope
+               ackRMMsgContext.addSOAPEnvelope();
+               
                MessageContext ackMsgContext = 
ackRMMsgContext.getMessageContext();
                ConfigurationContext configContext = 
ackMsgContext.getConfigurationContext();
                

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java 
Wed Feb 28 07:21:17 2007
@@ -22,6 +22,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
@@ -398,15 +399,13 @@
         * @param sequenceId
         * @throws SandeshaException
         */
-       public static void addAckMessage(RMMsgContext applicationMsg, 
RMSequenceBean rmBean,
-                       String sequenceId, RMDBean rmdBean)
+       public static void addAckMessage(RMMsgContext applicationMsg, String 
sequenceId, RMDBean rmdBean)
                        throws SandeshaException {
                if(log.isDebugEnabled())
                        log.debug("Entry: RMMsgCreator::addAckMessage " + 
sequenceId);
                
-               SOAPEnvelope envelope = applicationMsg.getSOAPEnvelope();
-
-               String rmNamespaceValue = 
SpecSpecificConstants.getRMNamespaceValue(rmBean.getRMVersion());
+               String rmVersion = rmdBean.getRMVersion();
+               String rmNamespaceValue = 
SpecSpecificConstants.getRMNamespaceValue(rmVersion);
 
                SequenceAcknowledgement sequenceAck = new 
SequenceAcknowledgement(rmNamespaceValue);
                Identifier id = new Identifier(rmNamespaceValue);
@@ -418,7 +417,7 @@
 
                if (rmdBean.isClosed()) {
                        // sequence is closed. so add the 'Final' part.
-                       if 
(SpecSpecificConstants.isAckFinalAllowed(rmBean.getRMVersion())) {
+                       if (SpecSpecificConstants.isAckFinalAllowed(rmVersion)) 
{
                                AckFinal ackFinal = new 
AckFinal(rmNamespaceValue);
                                sequenceAck.setAckFinal(ackFinal);
                        }
@@ -426,20 +425,20 @@
 
                
applicationMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 sequenceAck);
 
-               sequenceAck.toOMElement(envelope.getHeader());
-               
                if (applicationMsg.getWSAAction()==null) {
-                       
applicationMsg.setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(rmBean.getRMVersion()));
-                       
applicationMsg.setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(rmBean.getRMVersion()));
+                       
applicationMsg.setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(rmVersion));
+                       
applicationMsg.setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(rmVersion));
+               }
+               if(applicationMsg.getMessageId() == null) {
+                       applicationMsg.setMessageId(SandeshaUtil.getUUID());
                }
                
-               applicationMsg.setMessageId(SandeshaUtil.getUUID());
-               
-               //generating the SOAP envelope.
+               // Write the ack into the soap envelope
                try {
                        applicationMsg.addSOAPEnvelope();
                } catch(AxisFault e) {
-                       throw new SandeshaException(e);
+                       if(log.isDebugEnabled()) log.debug("Caught AxisFault", 
e);
+                       throw new SandeshaException(e.getMessage(), e);
                }
                
                // Ensure the message also contains the token that needs to be 
used

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=512799&r1=512798&r2=512799
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Wed Feb 28 07:21:17 2007
@@ -161,12 +161,8 @@
 
                        int messageType = senderBean.getMessageType();
                        
-                       if (isAckPiggybackableMsgType(messageType)) { // 
checking weather this message can carry piggybacked acks
-                               // checking weather this message can carry 
piggybacked acks
-                               // piggybacking if an ack if available for the 
same
-                               // sequence.
-                               // TODO do piggybacking based on wsa:To
-                                       
+                       if (isAckPiggybackableMsgType(messageType)) {
+                               // Piggyback ack messages based on the 'To' 
address of the message
                                
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
                        }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to