Author: chamikara
Date: Mon Jan 30 07:55:20 2006
New Revision: 373537
URL: http://svn.apache.org/viewcvs?rev=373537&view=rev
Log:
Corrected the ending of SimpleAxisServer correctly after an RM sequence.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
Mon Jan 30 07:55:20 2006
@@ -87,15 +87,14 @@
}
String internalSequenceId = (String)
internalSequenceBean.getValue();
- findBean.setInternalSequenceID(internalSequenceId);
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
findBean.setSend(true);
findBean.setReSend(false);
Collection collection = retransmitterBeanMgr.find(findBean);
+
Iterator it = collection.iterator();
if (it.hasNext()) {
-
SenderBean ackBean = (SenderBean) it.next();
long timeNow = System.currentTimeMillis();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
Mon Jan 30 07:55:20 2006
@@ -192,6 +192,8 @@
String NO_OF_OUTGOING_MSGS_ACKED = "NoOfOutGoingMessagesAcked";
String TRANSPORT_TO = "TransportTo";
+
+ String OUT_SEQ_ACKSTO = "OutSequenceAcksTo";
}
public interface SOAPVersion {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
Mon Jan 30 07:55:20 2006
@@ -22,6 +22,9 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.client.ListenerManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -176,7 +179,8 @@
* @param sequenceID
* @throws SandeshaException
*/
- public static void terminateSendingSide (ConfigurationContext
configContext, String sequenceID) throws SandeshaException {
+ public static void terminateSendingSide (ConfigurationContext
configContext, String sequenceID,boolean serverSide) throws SandeshaException {
+
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext);
//TODO - remove folowing redundant transaction
@@ -186,6 +190,38 @@
SenderBeanMgr retransmitterBeanMgr =
storageManager.getRetransmitterBeanMgr();
CreateSeqBeanMgr createSeqBeanMgr =
storageManager.getCreateSeqBeanMgr();
+
+ if (!serverSide) {
+ //stpoing the listner for the client side.
+
+ //SequencePropertyBean outGoingAcksToBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQ_ACKSTO);
+
+ boolean stopListnerForAsyncAcks = false;
+ SequencePropertyBean internalSequenceBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ if (internalSequenceBean!=null) {
+ String internalSequenceID =
internalSequenceBean.getValue();
+ SequencePropertyBean acksToBean =
sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ if (acksToBean!=null) {
+ String acksTo = acksToBean.getValue();
+ if (acksTo!=null &&
!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+ stopListnerForAsyncAcks = true;
+ }
+ }
+ }
+
+ try {
+ //this removes the listner entry for receiving
async acks.
+ if (stopListnerForAsyncAcks)
+
ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
+
+ //TODO stop listner for asyncControlMessages
+
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+ }
+
SequencePropertyBean internalSequenceBean =
sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean==null)
throw new SandeshaException ("TempSequence entry not
found");
@@ -228,14 +264,6 @@
}
terminateSendingTransaction.commit();
-
- //asking the listner to stop.
- //if (clientSide)
-// try {
-//
ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
-// } catch (AxisFault e) {
-// throw new SandeshaException (e.getMessage());
-// }
SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Mon Jan 30 07:55:20 2006
@@ -290,15 +290,6 @@
.getProperty(MessageContext.TRANSPORT_IN);
if (transportIn == null)
transportIn =
org.apache.axis2.Constants.TRANSPORT_HTTP;
-
- // For receiving async Ack messages.
- // try {
-
ListenerManager.makeSureStarted(transportIn, context);
- // } catch (AxisFault e) {
- // log.debug("Could not start
listener...");
- // log.debug(e.getStackTrace());
- // }
-
} else if (acksTo == null && serverSide) {
String incomingSequencId = SandeshaUtil
.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Mon Jan 30 07:55:20 2006
@@ -123,7 +123,6 @@
rmMsgCtx.setRelatesTo(null);
SenderBean input = new SenderBean();
- input.setInternalSequenceID(internalSequenceId);
input.setSend(true);
input.setReSend(true);
Collection retransmitterEntriesOfSequence = retransmitterMgr
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Jan 30 07:55:20 2006
@@ -411,7 +411,6 @@
//the internalSequenceId value of the retransmitter
Table for the
// messages related to an incoming
//sequence is the actual sequence ID
- ackBean.setInternalSequenceID(sequenceId);
// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
//
.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
@@ -442,7 +441,6 @@
//removing old acks.
SenderBean findBean = new SenderBean();
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- findBean.setInternalSequenceID(sequenceId);
findBean.setSend(true);
findBean.setReSend(false);
Collection coll = retransmitterBeanMgr.find(findBean);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
Mon Jan 30 07:55:20 2006
@@ -84,7 +84,7 @@
public Collection find(SenderBean bean) {
ArrayList beans = new ArrayList();
- Iterator iterator = table.values().iterator();
+ Iterator iterator = ((Hashtable) table).values().iterator();
SenderBean temp;
while (iterator.hasNext()) {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
Mon Jan 30 07:55:20 2006
@@ -621,9 +621,9 @@
return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX +
":" + to + ":" +sequenceKey;
}
-// public static String getServerSideInternalSeqID (String incomingSeqId) {
-// return (Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":"
+ incomingSeqId);
-// }
+ public static String getInternalSequenceID (String sequenceID) {
+ return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX +
":" + sequenceID;
+ }
public static String getSequenceIDFromInternalSequenceID (String
internalSequenceID, ConfigurationContext configurationContext) throws
SandeshaException {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
Mon Jan 30 07:55:20 2006
@@ -10,10 +10,13 @@
import java.util.StringTokenizer;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ListenerManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.Parameter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -188,6 +191,50 @@
SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+ updateClientSideListnerIfNeeded (firstAplicationMsgCtx);
+
+ }
+
+ private static void updateClientSideListnerIfNeeded (MessageContext
messageContext) throws SandeshaException {
+ if (messageContext.isServerSide())
+ return; //listners are updated only for the client
side.
+
+ String transportInProtocol =
messageContext.getOptions().getTransportInProtocol();
+
+ String acksTo = (String)
messageContext.getProperty(Sandesha2ClientAPI.AcksTo);
+ String mep =
messageContext.getAxisOperation().getMessageExchangePattern();
+
+ boolean startListnerForAsyncAcks = false;
+ boolean startListnerForAsyncControlMsgs = false; //For async
createSerRes & terminateSeq.
+
+ if (acksTo!=null &&
!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+ //starting listner for async acks.
+ startListnerForAsyncAcks = true;
+ }
+
+ if (mep!=null && !AxisOperation.MEP_URI_OUT_ONLY.equals(mep)) {
+ //starting listner for the async createSeqResponse &
terminateSer messages.
+ startListnerForAsyncControlMsgs = true;
+ }
+
+ try {
+ if ((startListnerForAsyncAcks ||
startListnerForAsyncControlMsgs) && transportInProtocol==null)
+ throw new SandeshaException ("Cant start the
listner since the TransportInProtocol is null");
+
+ if (startListnerForAsyncAcks)
+
ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+
+ if (startListnerForAsyncControlMsgs)
+
ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+
+
+ } catch (AxisFault e) {
+ String message = "Cant start the listner for incoming
messages";
+ log.error(e.getStackTrace());
+ System.out.println(e.getStackTrace());
+ throw new SandeshaException (message);
+ }
+
}
/**
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
Mon Jan 30 07:55:20 2006
@@ -197,8 +197,8 @@
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if
(sequence.getLastMessage() != null) {
-
TerminateManager.cleanReceivingSideAfterInvocation(
-
context, sequenceId);
+
TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
+
//this sequence
has no more invocations
stopInvokerForTheSequence(sequenceId);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon
Jan 30 07:55:20 2006
@@ -172,7 +172,7 @@
//do time out
processing.
//TODO
uncomment below line
-
//TerminateManager.terminateSendingSide(context,sequenceID);
+
TerminateManager.terminateSendingSide(context,sequenceID,msgCtx.isServerSide());
String message
= "Sequence timed out";
log.debug(message);
@@ -185,14 +185,8 @@
.piggybackAckIfPresent(rmMsgCtx);
}
-
-
preSendTransaction.commit();
- if
(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
- int i =1;
- }
-
try {
//every message should
be resumed (pause==false) when sending
boolean paused =
msgCtx.isPaused();
@@ -258,8 +252,7 @@
ConfigurationContext
configContext = msgCtx
.getConfigurationContext();
-
TerminateManager.terminateSendingSide(
-
configContext, sequenceID);
+
TerminateManager.terminateSendingSide(configContext,
sequenceID,msgCtx.isServerSide());
//removing a entry from
the Listener
String transport =
msgCtx.getTransportOut().getName().getLocalPart();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]