Author: chamikara
Date: Sun Nov 20 21:05:32 2005
New Revision: 345831

URL: http://svn.apache.org/viewcvs?rev=345831&view=rev
Log:
TerminateManager was added.
this has logic to clean a data saved due to a sequence.
currently cleaning happens as following
   Sending side -> After sending the terminate seq message
   Receiving side -> 
            IF (NOT_INORDER_INVOCATION)
               at the TerminateMsgProcessor
            ELSE
               at the terminateMsgProcessor + InOrderInvoker

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Modified:
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=345831&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java 
(added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java 
Sun Nov 20 21:05:32 2005
@@ -0,0 +1,160 @@
+/*

+ * Copyright 1999-2004 The Apache Software Foundation.

+ * 

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not

+ * use this file except in compliance with the License. You may obtain a copy 
of

+ * the License at

+ * 

+ * http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

+ * License for the specific language governing permissions and limitations 
under

+ * the License.

+ *  

+ */

+

+package org.apache.sandesha2;

+

+import java.util.ArrayList;

+import java.util.Collection;

+import java.util.Iterator;

+

+import org.apache.axis2.context.ConfigurationContext;

+import org.apache.sandesha2.storage.StorageManager;

+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;

+import org.apache.sandesha2.storage.beans.CreateSeqBean;

+import org.apache.sandesha2.storage.beans.NextMsgBean;

+import org.apache.sandesha2.storage.beans.RetransmitterBean;

+import org.apache.sandesha2.storage.beans.SequencePropertyBean;

+import org.apache.sandesha2.storage.beans.StorageMapBean;

+import org.apache.sandesha2.util.SandeshaUtil;

+

+/**

+ * @author Chamikara Jayalath <[EMAIL PROTECTED]>

+ */

+

+public class TerminateManager {

+

+       public static void terminateReceivingSide (ConfigurationContext 
configContext, String sequenceID) throws SandeshaException {

+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);

+               NextMsgBeanMgr nextMsgBeanMgr = 
storageManager.getNextMsgBeanMgr();

+               

+               //removing nextMsgMgr entries

+               NextMsgBean findNextMsgBean = new NextMsgBean ();

+               findNextMsgBean.setSequenceId(sequenceID);

+               Collection collection = nextMsgBeanMgr.find(findNextMsgBean);

+               Iterator iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();

+                       nextMsgBeanMgr.delete(nextMsgBean.getSequenceId());

+               }

+               

+               
if(Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE!=Constants.QOS.DeliveryAssurance.IN_ORDER)
 { 

+                       terminateAfterInvocation(configContext,sequenceID);

+               }

+

+       }

+       

+       public static void terminateAfterInvocation (ConfigurationContext 
configContext, String sequenceID) throws SandeshaException {

+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);

+               SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();

+               StorageMapBeanMgr storageMapBeanMgr = 
storageManager.getStorageMapBeanMgr();

+

+               //removing storageMap entries

+               StorageMapBean findStorageMapBean = new StorageMapBean ();

+               findStorageMapBean.setSequenceId(sequenceID);

+               Collection collection = 
storageMapBeanMgr.find(findStorageMapBean);

+               Iterator iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       StorageMapBean storageMapBean = (StorageMapBean) 
iterator.next();

+                       storageMapBeanMgr.delete(storageMapBean.getKey());

+               }

+               

+               //TODO - refine below (removing sequence properties of the 
receiving side).

+               //removing sequencePropertyEntries.

+//             SequencePropertyBean findSequencePropBean = new 
SequencePropertyBean ();

+//             findSequencePropBean.setSequenceId(sequenceID);

+//             collection = sequencePropertyBeanMgr.find(findSequencePropBean);

+//             iterator = collection.iterator();

+//             while (iterator.hasNext()) {

+//                     SequencePropertyBean sequencePropertyBean = 
(SequencePropertyBean) iterator.next();

+//                     boolean propertyRequiredForResponses = 
isRequiredForResponseSide (sequencePropertyBean.getName());

+//                     if (!propertyRequiredForResponses)

+//                             
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());

+//             }

+               

+               SequencePropertyBean allSequenceBean = 
sequencePropertyBeanMgr.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);

+               ArrayList allSequenceList = (ArrayList) 
allSequenceBean.getValue();

+               

+               allSequenceList.remove(sequenceID);

+       }

+       

+       private static boolean isRequiredForResponseSide (String name) {

+               if (name==null && 
name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))

+                       return false;

+               

+               if (name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))

+                       return false;

+               

+               return false;

+       }

+       

+       public static void terminateSendingSide (ConfigurationContext 
configContext, String sequenceID) throws SandeshaException {

+               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext);

+               SequencePropertyBeanMgr sequencePropertyBeanMgr = 
storageManager.getSequencePropretyBeanMgr();

+               RetransmitterBeanMgr retransmitterBeanMgr = 
storageManager.getRetransmitterBeanMgr();

+               CreateSeqBeanMgr createSeqBeanMgr = 
storageManager.getCreateSeqBeanMgr();

+               

+               SequencePropertyBean tempSequenceBean = 
sequencePropertyBeanMgr.retrieve(sequenceID,Constants.SequenceProperties.TEMP_SEQUENCE_ID);

+               if (tempSequenceBean==null)

+                       throw new SandeshaException ("TempSequence entry not 
found");

+               

+               String tempSequenceId = (String) tempSequenceBean.getValue();

+               

+               //removing retransmitterMgr entries

+               RetransmitterBean findRetransmitterBean = new RetransmitterBean 
();

+               findRetransmitterBean.setTempSequenceId(tempSequenceId);

+               Collection collection = 
retransmitterBeanMgr.find(findRetransmitterBean);

+               Iterator iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       RetransmitterBean retransmitterBean = 
(RetransmitterBean) iterator.next();

+                       
retransmitterBeanMgr.delete(retransmitterBean.getMessageId());

+               }

+               

+               //removing the createSeqMgrEntry

+               CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();

+               findCreateSequenceBean.setTempSequenceId(tempSequenceId);

+               collection = createSeqBeanMgr.find(findCreateSequenceBean);

+               iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       CreateSeqBean createSeqBean = (CreateSeqBean) 
iterator.next();

+                       
createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgId());

+               }

+               

+               //removing sequence properties

+               SequencePropertyBean findSequencePropertyBean1 = new 
SequencePropertyBean ();

+               findSequencePropertyBean1.setSequenceId(tempSequenceId);

+               collection = 
sequencePropertyBeanMgr.find(findSequencePropertyBean1);

+               iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       SequencePropertyBean sequencePropertyBean = 
(SequencePropertyBean) iterator.next();

+                       
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());

+               }

+               

+               SequencePropertyBean findSequencePropertyBean2 = new 
SequencePropertyBean ();

+               findSequencePropertyBean2.setSequenceId(tempSequenceId);

+               collection = 
sequencePropertyBeanMgr.find(findSequencePropertyBean2);

+               iterator = collection.iterator();

+               while (iterator.hasNext()) {

+                       SequencePropertyBean sequencePropertyBean = 
(SequencePropertyBean) iterator.next();

+                       
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());

+               }

+               

+       }

+}


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Sun Nov 20 21:05:32 2005
@@ -18,11 +18,20 @@
 package org.apache.sandesha2.msgprocessors;

 

 import javax.xml.namespace.QName;

+

+import org.apache.axis2.context.ConfigurationContext;

 import org.apache.axis2.context.MessageContext;

 import org.apache.sandesha2.Constants;

 import org.apache.sandesha2.RMMsgContext;

 import org.apache.sandesha2.SandeshaException;

+import org.apache.sandesha2.TerminateManager;

+import org.apache.sandesha2.storage.StorageManager;

+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;

+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;

+import org.apache.sandesha2.util.SandeshaUtil;

 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;

+import org.apache.sandesha2.wsrm.TerminateSequence;

 

 /**

  * @author Chamikara Jayalath <[EMAIL PROTECTED]>

@@ -44,6 +53,19 @@
 

                //Processing the terminate message

                //TODO Add terminate sequence message logic.

+               TerminateSequence terminateSequence = (TerminateSequence) 
terminateSeqRMMSg.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);

+               if (terminateSequence==null)

+                       throw new SandeshaException ("Terminate Sequence part 
is not available");

+               

+               String sequenceId = 
terminateSequence.getIdentifier().getIdentifier();

+               if (sequenceId==null || "".equals(sequenceId))

+                       throw new SandeshaException ("Invalid sequence id");

+               

+               ConfigurationContext context = 
terminateSeqMsg.getSystemContext();

+

+               

+               TerminateManager.terminateReceivingSide(context,sequenceId);

+               

 

                terminateSeqMsg.setPausedTrue(new 
QName(Constants.IN_HANDLER_NAME));

 


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
 Sun Nov 20 21:05:32 2005
@@ -21,6 +21,9 @@
 import org.apache.axis2.AxisFault;

 import org.apache.axis2.context.MessageContext;

 import org.apache.axis2.receivers.AbstractMessageReceiver;

+import org.apache.sandesha2.RMMsgContext;

+import org.apache.sandesha2.util.MsgInitializer;

+import org.apache.sandesha2.util.SandeshaUtil;

 

 /**

  * @author Chamikara Jayalath <[EMAIL PROTECTED]>

@@ -36,6 +39,9 @@
 

        public final void receive(MessageContext messgeCtx) throws AxisFault {

                System.out.println("RM MESSSAGE RECEIVER WAS CALLED");

+               

+               RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(messgeCtx);

+               System.out.println("MsgReceiver got type:" + 
SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));

        }

        

 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
(original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java 
Sun Nov 20 21:05:32 2005
@@ -27,6 +27,7 @@
 import org.apache.sandesha2.Constants;

 import org.apache.sandesha2.RMMsgContext;

 import org.apache.sandesha2.SandeshaException;

+import org.apache.sandesha2.TerminateManager;

 import org.apache.sandesha2.storage.StorageManager;

 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;

 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;

@@ -101,7 +102,7 @@
                                                .getValue();

                                Iterator seqPropIt = seqPropList.iterator();

 

-                               while (seqPropIt.hasNext()) {

+                               currentIteration: while (seqPropIt.hasNext()) {

 

                                        String sequenceId = (String) 
seqPropIt.next();

 

@@ -168,10 +169,24 @@
                                                                .find(

                                                                                
new StorageMapBean(null, nextMsgno,

                                                                                
                sequenceId)).iterator();

+

+                                               //terminate (AfterInvocation)

+                                               if (rmMsg.getMessageType() == 
Constants.MessageTypes.APPLICATION) {

+                                                       Sequence sequence = 
(Sequence) rmMsg

+                                                                       
.getMessagePart(Constants.MessageParts.SEQUENCE);

+                                                       if 
(sequence.getLastMessage() != null) {

+                                                               
TerminateManager.terminateAfterInvocation(

+                                                                               
context, sequenceId);

+                                                               

+                                                               //exit from 
current iteration. (since an entry was removed)

+                                                               break 
currentIteration;

+                                                       }

+                                               }

                                        }

 

                                        
nextMsgBean.setNextMsgNoToProcess(nextMsgno);

                                        nextMsgMgr.update(nextMsgBean);

+

                                }

                        } catch (SandeshaException e1) {

                                // TODO Auto-generated catch block


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun 
Nov 20 21:05:32 2005
@@ -28,12 +28,14 @@
 import org.apache.sandesha2.Constants;

 import org.apache.sandesha2.RMMsgContext;

 import org.apache.sandesha2.SandeshaException;

+import org.apache.sandesha2.TerminateManager;

 import org.apache.sandesha2.storage.StorageManager;

 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;

 import org.apache.sandesha2.storage.beans.RetransmitterBean;

 import org.apache.sandesha2.util.MessageRetransmissionAdjuster;

 import org.apache.sandesha2.util.MsgInitializer;

 import org.apache.sandesha2.util.SandeshaUtil;

+import org.apache.sandesha2.wsrm.TerminateSequence;

 

 /**

  * @author Chamikara Jayalath <[EMAIL PROTECTED]>

@@ -121,6 +123,16 @@
 

                                                if (!msgCtx.isServerSide())

                                                        
checkForSyncResponses(msgCtx);

+                                               

+                                               

+                                               if 
(rmMsgCtx.getMessageType()==Constants.MessageTypes.TERMINATE_SEQ) {

+                                                       //terminate sending 
side.

+                                                       TerminateSequence 
terminateSequence = (TerminateSequence) 
rmMsgCtx.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);

+                                                       String sequenceID = 
terminateSequence.getIdentifier().getIdentifier();

+                                                       ConfigurationContext 
configContext = msgCtx.getSystemContext();

+                                                       

+                                                       
TerminateManager.terminateSendingSide(configContext,sequenceID);

+                                               }

 

                                        } catch (AxisFault e1) {

                                                e1.printStackTrace();




---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to