Author: chamikara
Date: Sun Nov 20 21:05:32 2005
New Revision: 345831
URL: http://svn.apache.org/viewcvs?rev=345831&view=rev
Log:
TerminateManager was added.
this has logic to clean a data saved due to a sequence.
currently cleaning happens as following
Sending side -> After sending the terminate seq message
Receiving side ->
IF (NOT_INORDER_INVOCATION)
at the TerminateMsgProcessor
ELSE
at the terminateMsgProcessor + InOrderInvoker
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=345831&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
(added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Sun Nov 20 21:05:32 2005
@@ -0,0 +1,160 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.storage.beans.StorageMapBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * @author Chamikara Jayalath <[EMAIL PROTECTED]>
+ */
+
+public class TerminateManager {
+
+ public static void terminateReceivingSide (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ NextMsgBeanMgr nextMsgBeanMgr =
storageManager.getNextMsgBeanMgr();
+
+ //removing nextMsgMgr entries
+ NextMsgBean findNextMsgBean = new NextMsgBean ();
+ findNextMsgBean.setSequenceId(sequenceID);
+ Collection collection = nextMsgBeanMgr.find(findNextMsgBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
+ nextMsgBeanMgr.delete(nextMsgBean.getSequenceId());
+ }
+
+
if(Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE!=Constants.QOS.DeliveryAssurance.IN_ORDER)
{
+ terminateAfterInvocation(configContext,sequenceID);
+ }
+
+ }
+
+ public static void terminateAfterInvocation (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
+ StorageMapBeanMgr storageMapBeanMgr =
storageManager.getStorageMapBeanMgr();
+
+ //removing storageMap entries
+ StorageMapBean findStorageMapBean = new StorageMapBean ();
+ findStorageMapBean.setSequenceId(sequenceID);
+ Collection collection =
storageMapBeanMgr.find(findStorageMapBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ StorageMapBean storageMapBean = (StorageMapBean)
iterator.next();
+ storageMapBeanMgr.delete(storageMapBean.getKey());
+ }
+
+ //TODO - refine below (removing sequence properties of the
receiving side).
+ //removing sequencePropertyEntries.
+// SequencePropertyBean findSequencePropBean = new
SequencePropertyBean ();
+// findSequencePropBean.setSequenceId(sequenceID);
+// collection = sequencePropertyBeanMgr.find(findSequencePropBean);
+// iterator = collection.iterator();
+// while (iterator.hasNext()) {
+// SequencePropertyBean sequencePropertyBean =
(SequencePropertyBean) iterator.next();
+// boolean propertyRequiredForResponses =
isRequiredForResponseSide (sequencePropertyBean.getName());
+// if (!propertyRequiredForResponses)
+//
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+// }
+
+ SequencePropertyBean allSequenceBean =
sequencePropertyBeanMgr.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ ArrayList allSequenceList = (ArrayList)
allSequenceBean.getValue();
+
+ allSequenceList.remove(sequenceID);
+ }
+
+ private static boolean isRequiredForResponseSide (String name) {
+ if (name==null &&
name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ return false;
+
+ if (name.equals(Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ return false;
+
+ return false;
+ }
+
+ public static void terminateSendingSide (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropretyBeanMgr();
+ RetransmitterBeanMgr retransmitterBeanMgr =
storageManager.getRetransmitterBeanMgr();
+ CreateSeqBeanMgr createSeqBeanMgr =
storageManager.getCreateSeqBeanMgr();
+
+ SequencePropertyBean tempSequenceBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Constants.SequenceProperties.TEMP_SEQUENCE_ID);
+ if (tempSequenceBean==null)
+ throw new SandeshaException ("TempSequence entry not
found");
+
+ String tempSequenceId = (String) tempSequenceBean.getValue();
+
+ //removing retransmitterMgr entries
+ RetransmitterBean findRetransmitterBean = new RetransmitterBean
();
+ findRetransmitterBean.setTempSequenceId(tempSequenceId);
+ Collection collection =
retransmitterBeanMgr.find(findRetransmitterBean);
+ Iterator iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ RetransmitterBean retransmitterBean =
(RetransmitterBean) iterator.next();
+
retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+ }
+
+ //removing the createSeqMgrEntry
+ CreateSeqBean findCreateSequenceBean = new CreateSeqBean ();
+ findCreateSequenceBean.setTempSequenceId(tempSequenceId);
+ collection = createSeqBeanMgr.find(findCreateSequenceBean);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ CreateSeqBean createSeqBean = (CreateSeqBean)
iterator.next();
+
createSeqBeanMgr.delete(createSeqBean.getCreateSeqMsgId());
+ }
+
+ //removing sequence properties
+ SequencePropertyBean findSequencePropertyBean1 = new
SequencePropertyBean ();
+ findSequencePropertyBean1.setSequenceId(tempSequenceId);
+ collection =
sequencePropertyBeanMgr.find(findSequencePropertyBean1);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean sequencePropertyBean =
(SequencePropertyBean) iterator.next();
+
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+ }
+
+ SequencePropertyBean findSequencePropertyBean2 = new
SequencePropertyBean ();
+ findSequencePropertyBean2.setSequenceId(tempSequenceId);
+ collection =
sequencePropertyBeanMgr.find(findSequencePropertyBean2);
+ iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ SequencePropertyBean sequencePropertyBean =
(SequencePropertyBean) iterator.next();
+
sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceId(),sequencePropertyBean.getName());
+ }
+
+ }
+}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Sun Nov 20 21:05:32 2005
@@ -18,11 +18,20 @@
package org.apache.sandesha2.msgprocessors;
import javax.xml.namespace.QName;
+
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* @author Chamikara Jayalath <[EMAIL PROTECTED]>
@@ -44,6 +53,19 @@
//Processing the terminate message
//TODO Add terminate sequence message logic.
+ TerminateSequence terminateSequence = (TerminateSequence)
terminateSeqRMMSg.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+ if (terminateSequence==null)
+ throw new SandeshaException ("Terminate Sequence part
is not available");
+
+ String sequenceId =
terminateSequence.getIdentifier().getIdentifier();
+ if (sequenceId==null || "".equals(sequenceId))
+ throw new SandeshaException ("Invalid sequence id");
+
+ ConfigurationContext context =
terminateSeqMsg.getSystemContext();
+
+
+ TerminateManager.terminateReceivingSide(context,sequenceId);
+
terminateSeqMsg.setPausedTrue(new
QName(Constants.IN_HANDLER_NAME));
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
Sun Nov 20 21:05:32 2005
@@ -21,6 +21,9 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* @author Chamikara Jayalath <[EMAIL PROTECTED]>
@@ -36,6 +39,9 @@
public final void receive(MessageContext messgeCtx) throws AxisFault {
System.out.println("RM MESSSAGE RECEIVER WAS CALLED");
+
+ RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(messgeCtx);
+ System.out.println("MsgReceiver got type:" +
SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
Sun Nov 20 21:05:32 2005
@@ -27,6 +27,7 @@
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
@@ -101,7 +102,7 @@
.getValue();
Iterator seqPropIt = seqPropList.iterator();
- while (seqPropIt.hasNext()) {
+ currentIteration: while (seqPropIt.hasNext()) {
String sequenceId = (String)
seqPropIt.next();
@@ -168,10 +169,24 @@
.find(
new StorageMapBean(null, nextMsgno,
sequenceId)).iterator();
+
+ //terminate (AfterInvocation)
+ if (rmMsg.getMessageType() ==
Constants.MessageTypes.APPLICATION) {
+ Sequence sequence =
(Sequence) rmMsg
+
.getMessagePart(Constants.MessageParts.SEQUENCE);
+ if
(sequence.getLastMessage() != null) {
+
TerminateManager.terminateAfterInvocation(
+
context, sequenceId);
+
+ //exit from
current iteration. (since an entry was removed)
+ break
currentIteration;
+ }
+ }
}
nextMsgBean.setNextMsgNoToProcess(nextMsgno);
nextMsgMgr.update(nextMsgBean);
+
}
} catch (SandeshaException e1) {
// TODO Auto-generated catch block
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345831&r1=345830&r2=345831&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun
Nov 20 21:05:32 2005
@@ -28,12 +28,14 @@
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* @author Chamikara Jayalath <[EMAIL PROTECTED]>
@@ -121,6 +123,16 @@
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
+
+
+ if
(rmMsgCtx.getMessageType()==Constants.MessageTypes.TERMINATE_SEQ) {
+ //terminate sending
side.
+ TerminateSequence
terminateSequence = (TerminateSequence)
rmMsgCtx.getMessagePart(Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID =
terminateSequence.getIdentifier().getIdentifier();
+ ConfigurationContext
configContext = msgCtx.getSystemContext();
+
+
TerminateManager.terminateSendingSide(configContext,sequenceID);
+ }
} catch (AxisFault e1) {
e1.printStackTrace();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]