Author: mlovett Date: Wed Feb 28 07:21:17 2007 New Revision: 512799 URL: http://svn.apache.org/viewvc?view=rev&rev=512799 Log: Rework acks so that we piggyback based on the To EPR, and avoid storing ack messages when we know that we can piggyback later
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Wed Feb 28 07:21:17 2007 @@ -18,8 +18,6 @@ package org.apache.sandesha2.handlers; import org.apache.axis2.AxisFault; -import org.apache.axis2.Constants; -import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.description.AxisService; @@ -29,23 +27,17 @@ import org.apache.sandesha2.MessageValidator; import org.apache.sandesha2.RMMsgContext; import org.apache.sandesha2.Sandesha2Constants; -import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.i18n.SandeshaMessageHelper; import org.apache.sandesha2.i18n.SandeshaMessageKeys; import org.apache.sandesha2.msgprocessors.AckRequestedProcessor; import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor; import org.apache.sandesha2.msgprocessors.MessagePendingProcessor; import org.apache.sandesha2.msgprocessors.SequenceProcessor; -import org.apache.sandesha2.policy.SandeshaPolicyBean; import org.apache.sandesha2.storage.StorageManager; import org.apache.sandesha2.storage.Transaction; -import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr; -import org.apache.sandesha2.storage.beans.RMDBean; -import org.apache.sandesha2.util.AcknowledgementManager; import org.apache.sandesha2.util.FaultManager; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.SandeshaUtil; -import org.apache.sandesha2.wsrm.Sequence; /** * This is invoked in the inFlow of an RM endpoint. This is responsible for @@ -160,90 +152,4 @@ return returnValue; } - - public void flowComplete(MessageContext msgContext) { - super.flowComplete(msgContext); - - Transaction transaction = null; - try { - //if in order is not enabled and server side and this is an application message - - //check the replyTo address - //check the AcksTo address of the incoming sequence - -// if (replyTo is anonymous and this is not an InOnly message) -// add an HOLD response property -// SUSPEND the execution -// Sender will attach a sync response using the RequestResponseTransport object. -// else (if acksTo is anonymous AND no response message has been added) -// send an ack to the back channel now. - - ConfigurationContext configurationContext = msgContext.getConfigurationContext(); - StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext, - configurationContext.getAxisConfiguration()); - - transaction = storageManager.getTransaction(); - - RMMsgContext rmMsgContext = MsgInitializer.initializeMessage(msgContext); - - SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation()); - - boolean inOrder= policyBean.isInOrder(); - - if (msgContext.isServerSide() && !inOrder && rmMsgContext.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION) { - - Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); - String sequenceId = sequence.getIdentifier().getIdentifier(); - - RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr(); - - RMDBean findBean = new RMDBean (); - findBean.setSequenceID(sequenceId); - RMDBean rmdBean = rmdBeanMgr.findUnique(findBean); - - if (rmdBean==null) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rmdBeanNotFound,sequenceId); - throw new SandeshaException (message); - } - - String acksToAddress = rmdBean.getAcksToEPR(); - - EndpointReference acksTo = new EndpointReference (acksToAddress); - - if (acksTo!=null && acksTo.hasAnonymousAddress()) { - - Object responseWritten = msgContext.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN); - if (responseWritten==null || !Constants.VALUE_TRUE.equals(responseWritten)) { - RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgContext, rmdBean, sequenceId, storageManager, true); - msgContext.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE); - AcknowledgementManager.sendAckNow(ackRMMsgContext); - } - - } - } - } catch (AxisFault e) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.exceptionInFlowCompletion); - log.error(message, e); - - if (transaction != null) { - try { - transaction.rollback(); - transaction = null; - } catch (Exception e1) { - message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString()); - log.debug(message, e); - } - } - } finally { - if (transaction != null) { - try { - transaction.commit(); - } catch (Exception e) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString()); - log.debug(message, e); - } - } - } - } - } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Wed Feb 28 07:21:17 2007 @@ -146,7 +146,6 @@ public static final String cannotFindAddressElement="cannotFindAddressElement"; public static final String cannotFindAddressText="cannotFindAddressText"; public static final String nullPassedElement="nullPassedElement"; - public static final String invalidAckMessageEntry="invalidAckMessageEntry"; public static final String seqPartIsNull="seqPartIsNull"; public static final String incomingSequenceNotValidID="incomingSequenceNotValidID"; Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Wed Feb 28 07:21:17 2007 @@ -157,7 +157,6 @@ cannotFindAddressElement=Cannot find an ''Address'' part in the given element {0}. cannotFindAddressText=The passed element {0} does not have a valid address text. nullPassedElement=The passed element is null. -invalidAckMessageEntry=Invalid ack message entry: {0}. seqPartIsNull=Sequence part is null. incomingSequenceNotValidID=The ID for the incoming sequence is not valid: ''{0}''. Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Wed Feb 28 07:21:17 2007 @@ -174,7 +174,7 @@ ackMsgCtx.setTo(acksTo); ackMsgCtx.setReplyTo(msgContext.getTo()); - RMMsgCreator.addAckMessage(ackRMMsgCtx,rmdBean, sequenceId, rmdBean); + RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean); ackRMMsgCtx.getMessageContext().setServerSide(true); if (acksTo.hasAnonymousAddress()) { Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Wed Feb 28 07:21:17 2007 @@ -20,8 +20,6 @@ import java.util.Iterator; import org.apache.axiom.om.OMElement; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.ContextFactory; @@ -43,7 +41,6 @@ import org.apache.sandesha2.util.AcknowledgementManager; import org.apache.sandesha2.util.FaultManager; import org.apache.sandesha2.util.RMMsgCreator; -import org.apache.sandesha2.util.SOAPAbstractFactory; import org.apache.sandesha2.util.SandeshaUtil; import org.apache.sandesha2.util.SpecSpecificConstants; import org.apache.sandesha2.util.WSRMMessageSender; @@ -102,24 +99,7 @@ storageManager.getRMDBeanMgr().update(rmdBean); RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, rmdBean, sequenceId, storageManager, true); - - MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext(); - - String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue(); - ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue); - - SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil - .getSOAPVersion(rmMsgCtx.getSOAPEnvelope())); - - // Setting new envelope - SOAPEnvelope envelope = factory.getDefaultEnvelope(); - try { - ackMsgCtx.setEnvelope(envelope); - } catch (AxisFault e3) { - throw new SandeshaException(e3.getMessage()); - } - - // adding the ack part to the envelope. + // adding the ack part(s) to the envelope. Iterator sequenceAckIter = ackRMMsgCtx .getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Wed Feb 28 07:21:17 2007 @@ -303,13 +303,9 @@ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId); rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new Long (msgNo)); -// adding of acks - -// if acksTo anonymous -// add an ack entry with an infinite ack interval so that it will be piggybacked by any possible response message -// else -// add an ack entry here - + // We only create an ack message if: + // - We have anonymous acks, and the backchannel is free + // - We have async acks boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) || WSDLConstants.MEP_CONSTANT_IN_ONLY == mep; EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR()); @@ -320,18 +316,13 @@ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE); AcknowledgementManager.sendAckNow(ackRMMsgContext); } - } else { //Scenario 2 and Scenario 3 - // having a negative value for timeToSend will make this behave as having an infinite ack interval. - long timeToSend = -1; - if (!acksTo.hasAnonymousAddress()) { - SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation()); - long ackInterval = policyBean.getAcknowledgementInterval(); - timeToSend = System.currentTimeMillis() + ackInterval; - } + } else if (!acksTo.hasAnonymousAddress()) { + SandeshaPolicyBean policyBean = SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation()); + long ackInterval = policyBean.getAcknowledgementInterval(); + long timeToSend = System.currentTimeMillis() + ackInterval; RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true); - AcknowledgementManager.removeAckBeanEntries(sequenceId, storageManager); AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, timeToSend, storageManager); } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Wed Feb 28 07:21:17 2007 @@ -315,18 +315,11 @@ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, rmdBean, sequenceId, storageManager, true); + // copy over the ack parts Iterator iter = ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); - - if (iter.hasNext()) { + while (iter.hasNext()) { SequenceAcknowledgement seqAck = (SequenceAcknowledgement) iter.next(); - if (seqAck==null) { - String message = "No SequenceAcknowledgement part is present"; - throw new SandeshaException (message); - } - terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, seqAck); - } else { - //TODO } terminateSeqResponseRMMsg.addSOAPEnvelope(); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Wed Feb 28 07:21:17 2007 @@ -19,14 +19,9 @@ import java.util.Collection; import java.util.Iterator; -import java.util.List; -import javax.xml.namespace.QName; - -import org.apache.axiom.om.OMElement; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; -import org.apache.axiom.soap.SOAPHeader; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; import org.apache.axis2.addressing.EndpointReference; @@ -46,10 +41,7 @@ import org.apache.sandesha2.storage.StorageManager; import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beans.RMDBean; -import org.apache.sandesha2.storage.beans.RMSBean; import org.apache.sandesha2.storage.beans.SenderBean; -import org.apache.sandesha2.wsrm.Sequence; -import org.apache.sandesha2.wsrm.SequenceAcknowledgement; /** * Contains logic for managing acknowledgements. @@ -71,29 +63,31 @@ if (log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent"); - ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext(); SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr(); - // If this message is going to an anonymous address then we add in an ack for the - // sequence that was used on the inbound side. + // If this message is going to an anonymous address, and the inbound sequence has + // anonymous acksTo, then we add in an ack for the inbound sequence. EndpointReference target = rmMessageContext.getTo(); if(target.hasAnonymousAddress()) { - Sequence sequence = (Sequence) rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); - if(sequence != null) { - String outboundSequenceId = sequence.getIdentifier().getIdentifier(); - RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outboundSequenceId); - String outboundInternalSeq = rmsBean.getInternalSequenceID(); - String inboundSequenceId = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(outboundInternalSeq); - RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequenceId); - if (rmdBean != null) { - if(log.isDebugEnabled()) log.debug("Piggybacking ack for " + inboundSequenceId); - RMMsgCreator.addAckMessage(rmMessageContext, rmsBean, inboundSequenceId, rmdBean); + String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID); + if(inboundSequence != null) { + RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence); + if(inboundBean != null) { + String acksTo = inboundBean.getAcksToEPR(); + EndpointReference acksToEPR = new EndpointReference(acksTo); + + if(acksTo == null || acksToEPR.hasAnonymousAddress()) { + if(log.isDebugEnabled()) log.debug("Piggybacking ack for inbound sequence: " + inboundSequence); + RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean); + } } } if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon"); return; } + // From here on, we must be dealing with a real address. Piggyback all sequences that have an + // acksTo that matches the To address, and that have an ackMessage queued up for sending. SenderBean findBean = new SenderBean(); findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); findBean.setSend(true); @@ -101,67 +95,26 @@ Collection collection = retransmitterBeanMgr.find(findBean); Iterator it = collection.iterator(); - while (it.hasNext()) { SenderBean ackBean = (SenderBean) it.next(); + // Piggybacking will happen only if the end of ack interval (timeToSend) is not reached. long timeNow = System.currentTimeMillis(); if (ackBean.getTimeToSend() > timeNow) { - // //Piggybacking will happen only if the end of ack interval - // (timeToSend) is not reached. - - MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(), - configurationContext); - - if (log.isDebugEnabled()) log.debug("Adding ack headers"); - - // deleting the ack entry. + // Delete the beans that would have sent the ack retransmitterBeanMgr.delete(ackBean.getMessageID()); + storageManager.removeMessageContext(ackBean.getMessageContextRefKey()); - // Adding the ack(s) to the application message - boolean acks = false; - MessageContext messageContext = rmMessageContext.getMessageContext(); - SOAPHeader appMsgHeaders = messageContext.getEnvelope().getHeader(); - - // If the App message doesn't have a SOAP Header, create one here. - if (appMsgHeaders == null) { - SOAPFactory factory = (SOAPFactory) messageContext.getEnvelope().getOMFactory(); - appMsgHeaders = factory.createSOAPHeader(messageContext.getEnvelope()); - } - - SOAPHeader headers = ackMsgContext.getEnvelope().getHeader(); - if(headers != null) { - for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) { - - QName name = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK); - Iterator iter = headers.getChildrenWithName(name); - while(iter.hasNext()) { - OMElement ackElement = (OMElement) iter.next(); - - SequenceAcknowledgement sequenceAcknowledgement = new SequenceAcknowledgement (Sandesha2Constants.SPEC_NS_URIS[i]); - sequenceAcknowledgement.fromOMElement(ackElement); - - sequenceAcknowledgement.toOMElement(appMsgHeaders); - acks = true; - - // Make sure that the outbound message is secured with the token that - // matches the ack. - String seqId = sequenceAcknowledgement.getIdentifier().getIdentifier(); - RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, seqId); - RMMsgCreator.secureOutboundMessage(rmdBean, messageContext); - } - } - } - - if (!acks) { - String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckMessageEntry, - ackMsgContext.getEnvelope().toString()); - log.debug(message); - throw new SandeshaException(message); + String sequenceId = ackBean.getSequenceID(); + if (log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId); + + RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId); + if(rmdBean != null) { + RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean); } } } - + if (log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent"); } @@ -224,7 +177,7 @@ ackMsgCtx.setServerSide(serverSide); // adding the SequenceAcknowledgement part. - RMMsgCreator.addAckMessage(ackRMMsgCtx, rmdBean ,sequenceId, rmdBean); + RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean); if (log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::generateAckMessage"); @@ -245,37 +198,19 @@ } if (log.isDebugEnabled()) - log.debug("Enter: AcknowledgementManager::verifySequenceCompletion " + result); + log.debug("Exit: AcknowledgementManager::verifySequenceCompletion " + result); return result; } - public static void addFinalAcknowledgement () { - - } - - public static void removeAckBeanEntries (String sequenceId, StorageManager storageManager) throws SandeshaException { - SenderBean findBean = new SenderBean (); - findBean.setSequenceID(sequenceId); - findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); - - SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr(); - List senderBeans = senderBeanMgr.find(findBean); - - for (Iterator it = senderBeans.iterator();it.hasNext();) { - SenderBean bean = (SenderBean) it.next(); - senderBeanMgr.delete(bean.getMessageID()); - } - - } - public static void addAckBeanEntry ( RMMsgContext ackRMMsgContext, String sequenceId, long timeToSend, StorageManager storageManager) throws AxisFault { + if(log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::addAckBeanEntry"); - // / Transaction asyncAckTransaction = - // storageManager.getTransaction(); + // Write the acks into the envelope + ackRMMsgContext.addSOAPEnvelope(); MessageContext ackMsgContext = ackRMMsgContext.getMessageContext(); @@ -297,28 +232,19 @@ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); - - // Ack will be sent as stand alone, only after the retransmitter - // interval. -// long timeToSend = System.currentTimeMillis() + ackInterval; - // removing old acks. SenderBean findBean = new SenderBean(); findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK); - - // this will be set to true in the sandesha2TransportSender. findBean.setSend(true); findBean.setReSend(false); + findBean.setSequenceID(sequenceId); Collection coll = retransmitterBeanMgr.find(findBean); Iterator it = coll.iterator(); - if (it.hasNext()) { + while(it.hasNext()) { SenderBean oldAckBean = (SenderBean) it.next(); - timeToSend = oldAckBean.getTimeToSend(); // If there is an - // old ack. This ack - // will be sent in - // the old - // timeToSend. + if(oldAckBean.getTimeToSend() < timeToSend) + timeToSend = oldAckBean.getTimeToSend(); // removing the retransmitted entry for the oldAck retransmitterBeanMgr.delete(oldAckBean.getMessageID()); @@ -331,8 +257,6 @@ ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE); - //asyncAckTransaction.commit(); - // passing the message through sandesha2sender ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE); @@ -341,12 +265,16 @@ // inserting the new ack. retransmitterBeanMgr.insert(ackBean); + if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::addAckBeanEntry"); } public static void sendAckNow (RMMsgContext ackRMMsgContext) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::sendAckNow"); + // Write the acks into the envelope + ackRMMsgContext.addSOAPEnvelope(); + MessageContext ackMsgContext = ackRMMsgContext.getMessageContext(); ConfigurationContext configContext = ackMsgContext.getConfigurationContext(); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Wed Feb 28 07:21:17 2007 @@ -22,6 +22,7 @@ import org.apache.axiom.om.OMElement; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; +import org.apache.axiom.soap.SOAPHeader; import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; @@ -398,15 +399,13 @@ * @param sequenceId * @throws SandeshaException */ - public static void addAckMessage(RMMsgContext applicationMsg, RMSequenceBean rmBean, - String sequenceId, RMDBean rmdBean) + public static void addAckMessage(RMMsgContext applicationMsg, String sequenceId, RMDBean rmdBean) throws SandeshaException { if(log.isDebugEnabled()) log.debug("Entry: RMMsgCreator::addAckMessage " + sequenceId); - SOAPEnvelope envelope = applicationMsg.getSOAPEnvelope(); - - String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmBean.getRMVersion()); + String rmVersion = rmdBean.getRMVersion(); + String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion); SequenceAcknowledgement sequenceAck = new SequenceAcknowledgement(rmNamespaceValue); Identifier id = new Identifier(rmNamespaceValue); @@ -418,7 +417,7 @@ if (rmdBean.isClosed()) { // sequence is closed. so add the 'Final' part. - if (SpecSpecificConstants.isAckFinalAllowed(rmBean.getRMVersion())) { + if (SpecSpecificConstants.isAckFinalAllowed(rmVersion)) { AckFinal ackFinal = new AckFinal(rmNamespaceValue); sequenceAck.setAckFinal(ackFinal); } @@ -426,20 +425,20 @@ applicationMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, sequenceAck); - sequenceAck.toOMElement(envelope.getHeader()); - if (applicationMsg.getWSAAction()==null) { - applicationMsg.setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(rmBean.getRMVersion())); - applicationMsg.setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(rmBean.getRMVersion())); + applicationMsg.setAction(SpecSpecificConstants.getSequenceAcknowledgementAction(rmVersion)); + applicationMsg.setSOAPAction(SpecSpecificConstants.getSequenceAcknowledgementSOAPAction(rmVersion)); + } + if(applicationMsg.getMessageId() == null) { + applicationMsg.setMessageId(SandeshaUtil.getUUID()); } - applicationMsg.setMessageId(SandeshaUtil.getUUID()); - - //generating the SOAP envelope. + // Write the ack into the soap envelope try { applicationMsg.addSOAPEnvelope(); } catch(AxisFault e) { - throw new SandeshaException(e); + if(log.isDebugEnabled()) log.debug("Caught AxisFault", e); + throw new SandeshaException(e.getMessage(), e); } // Ensure the message also contains the token that needs to be used Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=512799&r1=512798&r2=512799 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Wed Feb 28 07:21:17 2007 @@ -161,12 +161,8 @@ int messageType = senderBean.getMessageType(); - if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks - // checking weather this message can carry piggybacked acks - // piggybacking if an ack if available for the same - // sequence. - // TODO do piggybacking based on wsa:To - + if (isAckPiggybackableMsgType(messageType)) { + // Piggyback ack messages based on the 'To' address of the message AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]