Author: chamikara
Date: Sun May 14 03:52:36 2006
New Revision: 406310
URL: http://svn.apache.org/viewcvs?rev=406310&view=rev
Log:
Pusing the message context from proceeding when an exception occur.
Correction to the cleaning logic.
All the data (including message contexts) should be cleared at the end
of the sequence except for the data needed by reports. (this will be come
specially important when a permanent storage based storage manager is
present).
Added the removeMessageContext method to the storage manager.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Sun May 14 03:52:36 2006
@@ -163,11 +163,17 @@
}
} catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgContext.pause();
+
if (!withinTransaction) {
transaction.rollback();
msgContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
rolebacked = true;
}
+
+ String message = "Sandesha2 got an exception when
processing the in message";
+ throw new AxisFault (message,e);
} finally {
if (!withinTransaction && !rolebacked) {
transaction.commit();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
Sun May 14 03:52:36 2006
@@ -109,11 +109,17 @@
}
} catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgCtx.pause();
+
if (!withinTransaction) {
transaction.rollback();
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
rolebacked = true;
}
+
+ String message = "Sandesha2 got an exception when
processing the in message";
+ throw new AxisFault (message,e);
} finally {
if (!withinTransaction && !rolebacked) {
transaction.commit();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Sun May 14 03:52:36 2006
@@ -119,11 +119,18 @@
msgProcessor.processOutMessage(rmMsgCtx);
} catch (Exception e) {
+ //message should not be sent in a exception situation.
+ msgCtx.pause();
+
+ //rolling back the transaction
if (!withinTransaction) {
transaction.rollback();
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_FALSE);
rolebacked = true;
}
+
+ String message = "Sandesha2 got an exception when
processing the out message";
+ throw new AxisFault (message,e);
} finally {
if (!withinTransaction && !rolebacked) {
transaction.commit();
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Sun May 14 03:52:36 2006
@@ -166,8 +166,13 @@
for (long messageNo = lower; messageNo <= upper;
messageNo++) {
SenderBean retransmitterBean =
getRetransmitterEntry(
retransmitterEntriesOfSequence,
messageNo);
- if (retransmitterBean != null)
+ if (retransmitterBean != null) {
retransmitterMgr.delete(retransmitterBean.getMessageID());
+
+ //removing the application message from
the storage.
+ String storageKey =
retransmitterBean.getMessageContextRefKey();
+
storageManager.removeMessageContext(storageKey);
+ }
ackedMessagesList.add(new Long (messageNo));
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Sun May 14 03:52:36 2006
@@ -208,6 +208,8 @@
//updating the Highest_In_Msg_No property which gives the
highest message number retrieved from this sequence.
String highetsInMsgNoStr =
SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,configCtx);
String highetsInMsgKey =
SandeshaUtil.getSequenceProperty(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configCtx);
+ if (highetsInMsgKey==null)
+ highetsInMsgKey = SandeshaUtil.getUUID();
long highestInMsgNo=0;
if (highetsInMsgNoStr!=null) {
@@ -219,7 +221,11 @@
String str = new Long(msgNo).toString();
SequencePropertyBean highestMsgNoBean = new
SequencePropertyBean
(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_NUMBER,str);
- SequencePropertyBean highestMsgKeyBean = new
SequencePropertyBean
(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,key);
+ SequencePropertyBean highestMsgKeyBean = new
SequencePropertyBean
(sequenceId,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,highetsInMsgKey);
+
+ //storing the new message as the highest in message.
+ storageManager.removeMessageContext(highetsInMsgKey);
+
storageManager.storeMessageContext(highetsInMsgKey,msgCtx);
if (highetsInMsgNoStr!=null) {
seqPropMgr.update(highestMsgNoBean);
@@ -415,6 +421,7 @@
// find internal sequence id
String internalSequenceId = null;
+
String storageKey = SandeshaUtil.getUUID(); //the key which
will be used to store this message.
/* Internal sequence id is the one used to refer to the
sequence (since
@@ -811,7 +818,7 @@
// sending the message once through Sandesha2TransportSender.
AxisEngine engine = new
AxisEngine(createSeqMsg.getConfigurationContext());
try {
- engine.send(createSeqMsg);
+ engine.resumeSend(createSeqMsg);
} catch (AxisFault e) {
throw new SandeshaException (e.getMessage());
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Sun May 14 03:52:36 2006
@@ -31,7 +31,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -75,7 +74,6 @@
.getSandeshaStorageManager(configCtx);
//Processing for ack if available
-/// Transaction ackProcessTransaction =
storageManager.getTransaction();
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement)
createSeqResponseRMMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -84,12 +82,8 @@
ackProcessor.processInMessage(createSeqResponseRMMsgCtx);
}
-/// ackProcessTransaction.commit();
-
//Processing the create sequence response.
-/// Transaction createSeqResponseTransaction =
storageManager.getTransaction();
-
CreateSequenceResponse createSeqResponsePart =
(CreateSequenceResponse) createSeqResponseRMMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
if (createSeqResponsePart == null) {
@@ -137,6 +131,14 @@
createSeqBean.setSequenceID(newOutSequenceId);
createSeqMgr.update(createSeqBean);
+ SenderBean createSequenceSenderBean =
retransmitterMgr.retrieve(createSeqMsgId);
+ if (createSequenceSenderBean==null)
+ throw new SandeshaException ("Create sequence entry is
not found");
+
+ //removing the Create Sequence Message from the storage
+ String createSeqStorageKey =
createSequenceSenderBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(createSeqStorageKey);
+
//deleting the create sequence entry.
retransmitterMgr.delete(createSeqMsgId);
@@ -154,11 +156,6 @@
sequencePropMgr.insert(outSequenceBean);
sequencePropMgr.insert(internalSequenceBean);
-/// createSeqResponseTransaction.commit();
-
-
-/// Transaction offerProcessTransaction =
storageManager.getTransaction();
-
//processing for accept (offer has been sent)
Accept accept = createSeqResponsePart.getAccept();
if (accept != null) {
@@ -217,10 +214,6 @@
}
-/// offerProcessTransaction.commit();
-
-/// Transaction updateAppMessagesTransaction =
storageManager.getTransaction();
-
SenderBean target = new SenderBean();
target.setInternalSequenceID(internalSequenceId);
target.setSend(false);
@@ -281,12 +274,8 @@
//updating the message. this will correct the SOAP
envelope string.
storageManager.updateMessageContext(key,applicationMsg);
}
-
-/// updateAppMessagesTransaction.commit();
-/// Transaction lastUpdatedTimeTransaction =
storageManager.getTransaction();
SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx);
-/// lastUpdatedTimeTransaction.commit();
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
.setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Sun May 14 03:52:36 2006
@@ -175,6 +175,8 @@
seqPropMgr.insert(lastInMsgBean);
MessageContext highestInMsg =
storageManager.retrieveMessageContext(highestImMsgKey,configCtx);
+
+ //TODO get the out message in a storage
friendly manner.
MessageContext highestOutMessage =
highestInMsg.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_OUT_VALUE);
if (highestOutMessage!=null) {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/SandeshaStorageException.java
Sun May 14 03:52:36 2006
@@ -33,4 +33,8 @@
super (e);
}
+ public SandeshaStorageException (String m,Exception e) {
+ super (m,e);
+ }
+
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
Sun May 14 03:52:36 2006
@@ -69,4 +69,7 @@
public abstract void updateMessageContext (String
storageKey,MessageContext msgContext) throws SandeshaStorageException;
public abstract MessageContext retrieveMessageContext (String
storageKey, ConfigurationContext configContext) throws SandeshaStorageException;
+
+ public abstract void removeMessageContext (String storageKey) throws
SandeshaStorageException;
+
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Sun May 14 03:52:36 2006
@@ -131,6 +131,18 @@
storeMessageContext(key,msgContext);
}
+ public void removeMessageContext(String key) throws
SandeshaStorageException {
+ HashMap storageMap = (HashMap)
getContext().getProperty(MESSAGE_MAP_KEY);
+
+ if (storageMap==null) {
+ return;
+ }
+
+ Object entry = storageMap.get(key);
+ if (entry!=null)
+ storageMap.remove(key);
+ }
+
public void initStorage (AxisModule moduleDesc) {
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java
Sun May 14 03:52:36 2006
@@ -330,7 +330,12 @@
if (it.hasNext()) {
SenderBean oldAckBean = (SenderBean) it.next();
timeToSend = oldAckBean.getTimeToSend();
//If there is an old ack. This ack will be sent in the old timeToSend.
+
+ //removing the retransmitted entry for the
oldAck
retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+
+ //removing the message store entry for the old
ack
+
storageManager.removeMessageContext(oldAckBean.getMessageContextRefKey());
}
ackBean.setTimeToSend(timeToSend);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
Sun May 14 03:52:36 2006
@@ -312,6 +312,8 @@
SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+
+
updateClientSideListnerIfNeeded
(firstAplicationMsgCtx,anonymousURI);
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java
Sun May 14 03:52:36 2006
@@ -27,6 +27,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
@@ -112,6 +113,12 @@
while (iterator.hasNext()) {
InvokerBean storageMapBean = (InvokerBean)
iterator.next();
storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
+
+ //removing the respective message context from the
message store. If this is an in-only message.
+ //In-out message will be deleted when a ack is
retrieved for the out message.
+ String messageStoreKey =
storageMapBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
+
}
String cleanStatus = (String)
receivingSideCleanMap.get(sequenceID);
@@ -138,7 +145,13 @@
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
NextMsgBean nextMsgBean = (NextMsgBean) iterator.next();
- //nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+// nextMsgBeanMgr.delete(nextMsgBean.getSequenceID());
+ }
+
+ //removing the HighestInMessage entry.
+ String highestInMessageKey =
SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.HIGHEST_IN_MSG_KEY,configContext);
+ if (highestInMessageKey!=null) {
+
storageManager.removeMessageContext(highestInMessageKey);
}
removeReceivingSideProperties(configContext,sequenceID);
@@ -289,12 +302,15 @@
}
}
- //removing retransmitterMgr entries
+ //removing retransmitterMgr entries and corresponding message
contexts.
Collection collection =
retransmitterBeanMgr.find(internalSequenceID);
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
SenderBean retransmitterBean = (SenderBean)
iterator.next();
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ String messageStoreKey =
retransmitterBean.getMessageContextRefKey();
+ storageManager.removeMessageContext(messageStoreKey);
}
//removing the createSeqMgrEntry
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
Sun May 14 03:52:36 2006
@@ -178,6 +178,12 @@
.resume(msgToInvoke);
invoked = true;
storageMapMgr.delete(key);
+
+ //removing the
corresponding message context as well.
+ MessageContext msgCtx =
storageManager.retrieveMessageContext(key,context);
+ if (msgCtx!=null) {
+
storageManager.removeMessageContext(key);
+ }
} catch (AxisFault e) {
throw new
SandeshaException(e);
} finally {
@@ -205,7 +211,9 @@
}
} catch (Exception e1) {
- e1.printStackTrace();
+ String message = "Sandesha2 got an exception
when trying to invoke the message";
+ log.debug(message,e1);
+
if (transaction!=null) {
transaction.rollback();
rolebacked = true;
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=406310&r1=406309&r2=406310&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun
May 14 03:52:36 2006
@@ -118,15 +118,16 @@
if (senderBean==null) {
continue;
}
-
+
+ String key = (String)
senderBean.getMessageContextRefKey();
+ MessageContext msgCtx =
storageManager.retrieveMessageContext(key, context);
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
+
MessageRetransmissionAdjuster
retransmitterAdjuster = new MessageRetransmissionAdjuster();
boolean continueSending =
retransmitterAdjuster.adjustRetransmittion(senderBean, context);
if (!continueSending) {
continue;
}
-
- String key = (String)
senderBean.getMessageContextRefKey();
- MessageContext msgCtx =
storageManager.retrieveMessageContext(key, context);
if (msgCtx == null) {
String message = "Message context is
not present in the storage";
@@ -177,21 +178,23 @@
TransportOutDescription transportOutDescription
= msgCtx.getTransportOut();
TransportSender transportSender =
transportOutDescription.getSender();
- //have to commit the transaction before
sending. This may get changed when WS-AT is available.
- transaction.commit();
-
boolean successfullySent = false;
if (transportSender != null) {
+
+ //have to commit the transaction before
sending. This may get changed when WS-AT is available.
+ transaction.commit();
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE);
try {
//TODO change this to cater for
security.
transportSender.invoke(msgCtx);
successfullySent = true;
} catch (AxisFault e) {
// TODO Auto-generated catch
block
- log.debug("Could not send message");
-
log.debug(e.getStackTrace().toString());
+ String message = "Sandesha2 got an
exception when trying to send the message";
+ log.debug(message,e);
} finally {
transaction =
storageManager.getTransaction();
+
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
}
}
@@ -202,11 +205,14 @@
bean1.setSentCount(senderBean.getSentCount());
bean1.setTimeToSend(senderBean.getTimeToSend());
mgr.update(bean1);
- } else
+ } else {
mgr.delete(bean1.getMessageID());
+
+ //removing the message from the
storage.
+ String messageStoredKey =
bean1.getMessageContextRefKey();
+
storageManager.removeMessageContext(messageStoredKey);
+ }
}
-
-
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE);
if (successfullySent) {
if (!msgCtx.isServerSide())
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]