Author: gatfora
Date: Fri Jan 26 01:09:46 2007
New Revision: 500191

URL: http://svn.apache.org/viewvc?view=rev&rev=500191
Log:
Refactor ack processing inside SequenceProcessor

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=500191&r1=500190&r2=500191
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Fri Jan 26 01:09:46 2007
@@ -400,9 +400,8 @@
                        rmsBean.setNextMessageNumber(messageNumber);
                
                if (messageNumber == 1 && !sendCreateSequence) {
-                       // if first message - setup the sending side sequence - 
both for the
-                       // server and the client sides
-                       SequenceManager.setupNewClientSequence(msgContext, 
sequencePropertyKey, specVersion, storageManager);
+                       // Start the sender for the service side.
+                       SandeshaUtil.startSenderForTheSequence(configContext, 
outSequenceID);
                }
 
                RelatesTo relatesTo = msgContext.getRelatesTo();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=500191&r1=500190&r2=500191
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Fri Jan 26 01:09:46 2007
@@ -30,8 +30,8 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +46,6 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
@@ -185,11 +184,7 @@
                        }
                        
                    String outgoingSideInternalSequenceId = 
SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
-                   RMSBean findRMSBean = new RMSBean ();
-                   
findRMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
-                   
-                   RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
-                   RMSBean rmsBean = rmsBeanMgr.findUnique (findRMSBean);
+                   RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
outgoingSideInternalSequenceId);
                    if (rmsBean==null) {
                        String message = "Cannot find a entries for the 
response side sequence";
                        throw new SandeshaException (message);
@@ -216,7 +211,7 @@
                        Long msgNoOfInMsg = (Long) 
outMessageContext.getProperty(Sandesha2Constants.MSG_NO_OF_IN_MSG);
                        if (msgNoOfInMsg == null) {
                                        MessageContext inMsgContextOfOutMessage 
= outMessageContext.getOperationContext()
-                                                                               
                                          
.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+                                                                               
                                          
.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
                                        RMMsgContext inRMMsgContextOfOutMessage 
= MsgInitializer.initializeMessage(inMsgContextOfOutMessage);
 
                                        if (inMsgContextOfOutMessage != null) {
@@ -326,34 +321,35 @@
 //             else 
 //                     add an ack entry here
                
+               EndpointReference acksTo = new EndpointReference 
(bean.getAcksToEPR());
                
-               String acksToAddress = bean.getAcksToEPR();
-               EndpointReference acksTo = new EndpointReference 
(acksToAddress);
-       
-               if (acksTo!=null && acksTo.hasAnonymousAddress()) {
-                       
-                       long timeToSend = -1;   //having a negative value for 
timeToSend will make this behave as having an infinite ack interval.
-                       RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, 
storageManager,false,true);
-                       AcknowledgementManager.removeAckBeanEntries(sequenceId, 
storageManager);
-                       AcknowledgementManager.addAckBeanEntry(ackRMMsgContext 
,sequenceId, timeToSend, storageManager);
+               if (acksTo!=null && acksTo.hasAnonymousAddress() && 
+                         WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep)) {
+                       Object responseWritten = 
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
+                       if (responseWritten==null || 
!Constants.VALUE_TRUE.equals(responseWritten)) {
+                               RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, 
storageManager,false,true);
+                               
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
+                               
AcknowledgementManager.sendAckNow(ackRMMsgContext);
+                       }
                } else { //Scenario 2 and Scenario 3
                        SandeshaPolicyBean policyBean = 
SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
                        if (policyBean==null) {
                                String message = "Cant find the policy bean 
from the passed Axis2 description";
                                throw new SandeshaException (message);
                        }
-                       
-                       long ackInterval = 
policyBean.getAcknowledgementInterval();
-                       long timeToSend = System.currentTimeMillis() + 
ackInterval;
+                       //              having a negative value for timeToSend 
will make this behave as having an infinite ack interval.
+                       long timeToSend = -1;   
+                       if (acksTo!=null && !acksTo.hasAnonymousAddress()) {
+                               long ackInterval = 
policyBean.getAcknowledgementInterval();
+                               timeToSend = System.currentTimeMillis() + 
ackInterval;
+                       }
                        
                        RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, sequenceId, 
storageManager,false,true);
 
                        AcknowledgementManager.removeAckBeanEntries(sequenceId, 
storageManager);
                        AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, 
sequenceId, timeToSend, storageManager);
                }
-               
-               
-               
+
                if (inOrderInvocation) {
 
                        //if replyTo is anonymous and this is not an InOnly 
message
@@ -376,29 +372,9 @@
                                        //ack bean entry added previously may 
cause an ack to be piggybacked.
                                } else {
                                        result = InvocationResponse.ABORT;
-                                       
-                                       //in this case, we will be adding a 
sync ack if acksTo is anonymous.
-                                       if (acksTo!=null && 
acksTo.hasAnonymousAddress()) {
-                                               Object responseWritten = 
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
-                                               if (responseWritten==null || 
!Constants.VALUE_TRUE.equals(responseWritten)) {
-                                                       RMMsgContext 
ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx , 
sequenceId, storageManager,false,true);
-                                                       
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
-                                                       
AcknowledgementManager.sendAckNow(ackRMMsgContext);
-                                               }
-                                       }
                                }
                        } else {
                                result = InvocationResponse.ABORT;
-                       
-                               //in this case, we will be adding a sync ack if 
acksTo is anonymous.
-                               if (acksTo!=null && 
acksTo.hasAnonymousAddress()) {
-                                       Object responseWritten = 
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
-                                       if (responseWritten==null || 
!Constants.VALUE_TRUE.equals(responseWritten)) {
-                                               RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, 
storageManager,false,true);
-                                               
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
-                                               
AcknowledgementManager.sendAckNow(ackRMMsgContext);
-                                       }
-                               }
                        }
 
                        
@@ -421,7 +397,6 @@
                        // Starting the invoker if stopped.
                        
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), 
sequenceId);
                }
-
                
                if (log.isDebugEnabled())
                        log.debug("Exit: 
SequenceProcessor::processReliableMessage " + result);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to