Author: gatfora
Date: Fri Jan 12 07:45:34 2007
New Revision: 495613
URL: http://svn.apache.org/viewvc?view=rev&rev=495613
Log:
Move the OutOfOrderRanges, TermianteReceived, lastActivatedTime,
noOutGoingMsgs, transportTo, sequenceClosedClient and lastInMsgId to
RMS/RMDBeans
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Fri Jan 12 07:45:34 2007
@@ -253,11 +253,6 @@
// property. This is used as the
// sequenceId to share data b/w
// sequences
-
- //For IN_ORDER sequences, we can have finite ranges of messages
that can be
- //delivered out of order. These are maintained as a String that
is consistent
- //with the form described in
org.apache.sandesha2.util.RangeString
- String OUT_OF_ORDER_RANGES = "OutOfOrderRanges";
String INTERNAL_SEQUENCE_ID = "TempSequenceId";
@@ -265,25 +260,10 @@
String OFFERED_SEQUENCE = "OfferedSequence";
- String TERMINATE_RECEIVED = "TerminateReceived";
-
- String LAST_ACTIVATED_TIME = "LastActivatedTime";
-
- String NO_OF_OUTGOING_MSGS_ACKED = "NoOfOutGoingMessagesAcked";
-
- String TRANSPORT_TO = "TransportTo";
-
String SEQUENCE_CLOSED = "SequenceClosed";
- String SEQUENCE_CLOSED_CLIENT = "SequenceClosedClient";
//indicates the client has sent a close sequence
-
String SEQUENCE_TIMED_OUT = "SequenceTimedOut";
-
- // Once an inbound sequence is closed, or we receive a message
with the
- // 'LastMessage' marker, we record the message id of the
highest message
- // in the sequence.
- String LAST_IN_MSG_ID = "LastInMessageId";
-
+
String SECURITY_TOKEN = "SecurityToken";
String SOAP_VERSION = "SOAPVersion";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Fri Jan 12 07:45:34 2007
@@ -168,8 +168,9 @@
}
}
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
// updating the last activated time of the sequence.
- SequenceManager.updateLastActivatedTime(sequencePropertyKey,
storageManager);
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
while (nackIterator.hasNext()) {
Nack nack = (Nack) nackIterator.next();
@@ -193,33 +194,14 @@
// setting acked message date.
// TODO add details specific to each message.
- long noOfMsgsAcked =
getNoOfMessagesAcked(sequenceAck.getAcknowledgementRanges().iterator());
- SequencePropertyBean noOfMsgsAckedBean =
seqPropMgr.retrieve(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
- boolean added = false;
-
- if (noOfMsgsAckedBean == null) {
- added = true;
- noOfMsgsAckedBean = new SequencePropertyBean();
-
noOfMsgsAckedBean.setSequencePropertyKey(sequencePropertyKey);
-
noOfMsgsAckedBean.setName(Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED);
- }
-
- noOfMsgsAckedBean.setValue(Long.toString(noOfMsgsAcked));
-
- if (added)
- seqPropMgr.insert(noOfMsgsAckedBean);
- else
- seqPropMgr.update(noOfMsgsAckedBean);
-
- // setting the completed_messages list. This gives all the
messages of
- // the sequence that were acked.
- RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
// Set the completed message list, but only if we have actually
removed a SenderBean
// It is possible for the ACK messages arrive out of sequence
- if (removedSenderBean)
+ if (removedSenderBean) {
rmsBean.setClientCompletedMessages(ackedMessagesList);
+ long noOfMsgsAcked = ackedMessagesList.size();
+ rmsBean.setNumberOfMessagesAcked(noOfMsgsAcked);
+ }
long highestOutMsgNo = rmsBean.getLastOutMessage();
@@ -255,20 +237,4 @@
return null;
}
-
- private static long getNoOfMessagesAcked(Iterator ackRangeIterator) {
- long noOfMsgs = 0;
- while (ackRangeIterator.hasNext()) {
- AcknowledgementRange acknowledgementRange =
(AcknowledgementRange) ackRangeIterator.next();
- long lower = acknowledgementRange.getLowerValue();
- long upper = acknowledgementRange.getUpperValue();
-
- for (long i = lower; i <= upper; i++) {
- noOfMsgs++;
- }
- }
-
- return noOfMsgs;
- }
-
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Fri Jan 12 07:45:34 2007
@@ -137,10 +137,11 @@
internalSequenceId =
SandeshaUtil.getOutgoingSideInternalSequenceID(inboundSequence);
- // Deciding weather this is the last message. We assume
it is if it relates to
+ // Deciding whether this is the last message. We assume
it is if it relates to
// a message which arrived with the LastMessage flag on
it.
- String lastRequestId =
SandeshaUtil.getSequenceProperty(inboundSequence,
-
Sandesha2Constants.SequenceProperties.LAST_IN_MSG_ID, storageManager);
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
+ // Get the last in message
+ String lastRequestId = rmdBean.getLastInMessageId();
RelatesTo relatesTo = msgContext.getRelatesTo();
if(relatesTo != null && lastRequestId != null &&
lastRequestId.equals(relatesTo.getValue())) {
@@ -196,13 +197,13 @@
if (dummyMessageString != null &&
Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
dummyMessage = true;
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+
//see if the sequence is closed
- SequencePropertyBean sequenceClosed =
seqPropMgr.retrieve(sequencePropertyKey,
Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED_CLIENT);
- if(sequenceClosed!=null){
+ if(rmsBean != null && rmsBean.isSequenceClosedClient()){
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed,
internalSequenceId));
}
- RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
//see if the sequence is terminated
if(rmsBean != null && rmsBean.isTerminateAdded()) {
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated,
internalSequenceId));
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
Fri Jan 12 07:45:34 2007
@@ -159,11 +159,8 @@
setupOutMessage(rmMsgCtx);
//write into the sequence proeprties that the client is now
closed
- SequencePropertyBean sequenceClosedBean = new
SequencePropertyBean();
-
sequenceClosedBean.setSequencePropertyKey(getInternalSequenceID());
-
sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED_CLIENT);
- sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
-
getStorageManager().getSequencePropertyBeanMgr().insert(sequenceClosedBean);
+ getRMSBean().setSequenceClosedClient(true);
+ getStorageManager().getRMSBeanMgr().update(getRMSBean());
AxisOperation closeOperation =
SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Fri Jan 12 07:45:34 2007
@@ -99,158 +99,145 @@
MessageContext outMessage = null;
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
- try {
- // Create the new sequence id, as well as establishing
the beans that handle the
- // sequence state.
- RMDBean rmdBean =
SequenceManager.setupNewSequence(createSeqRMMsg, storageManager, secManager,
token);
-
- RMMsgContext createSeqResponse =
RMMsgCreator.createCreateSeqResponseMsg(createSeqRMMsg,
rmdBean.getSequenceID());
- outMessage = createSeqResponse.getMessageContext();
-
- createSeqResponse.setFlow(MessageContext.OUT_FLOW);
-
-
createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true"); // for
-
// making
-
// sure
-
// that
-
// this
-
// wont
-
// be
-
// processed
-
// again.
- CreateSequenceResponse createSeqResPart =
(CreateSequenceResponse) createSeqResponse
-
.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
+ // Create the new sequence id, as well as establishing the
beans that handle the
+ // sequence state.
+ RMDBean rmdBean =
SequenceManager.setupNewSequence(createSeqRMMsg, storageManager, secManager,
token);
+
+ RMMsgContext createSeqResponse =
RMMsgCreator.createCreateSeqResponseMsg(createSeqRMMsg,
rmdBean.getSequenceID());
+ outMessage = createSeqResponse.getMessageContext();
+
+ createSeqResponse.setFlow(MessageContext.OUT_FLOW);
- // OFFER PROCESSING
- SequenceOffer offer = createSeqPart.getSequenceOffer();
- if (offer != null) {
- Accept accept = createSeqResPart.getAccept();
- if (accept == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart);
- log.debug(message);
- throw new SandeshaException(message);
- }
+ // for making sure that this won't be processed again
+
createSeqResponse.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true");
+
+ CreateSequenceResponse createSeqResPart =
(CreateSequenceResponse) createSeqResponse
+
.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE);
- String offeredSequenceID =
offer.getIdentifer().getIdentifier(); // offered
-
// seq.
-
// id.
-
- boolean offerEcepted =
offerAccepted(offeredSequenceID, context, createSeqRMMsg, storageManager);
-
- if (offerEcepted) {
- // Setting the CreateSequence table
entry for the outgoing
- // side.
- RMSBean rMSBean = new RMSBean();
-
rMSBean.setSequenceID(offeredSequenceID);
- String outgoingSideInternalSequenceId =
SandeshaUtil
-
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
-
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
- // this is a dummy value
-
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
-
- rMSBean.setToEPR(rmdBean.getToEPR());
-
rMSBean.setAcksToEPR(rmdBean.getAcksToEPR());
-
rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
-
- String outgoingSideSequencePropertyKey
= outgoingSideInternalSequenceId;
+ // OFFER PROCESSING
+ SequenceOffer offer = createSeqPart.getSequenceOffer();
+ if (offer != null) {
+ Accept accept = createSeqResPart.getAccept();
+ if (accept == null) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAcceptPart);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
- RMSBeanMgr createSeqMgr =
storageManager.getRMSBeanMgr();
- createSeqMgr.insert(rMSBean);
+ // offered seq id
+ String offeredSequenceID =
offer.getIdentifer().getIdentifier();
+
+ boolean offerEcepted = offerAccepted(offeredSequenceID,
context, createSeqRMMsg, storageManager);
- // Setting sequence properties for the
outgoing sequence.
- // Only will be used by the server side
response path. Will
- // be wasted properties for the client
side.
-
- // setting the internal_sequence_id
- SequencePropertyBean
internalSequenceBean = new SequencePropertyBean();
-
internalSequenceBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
-
internalSequenceBean.setSequencePropertyKey(offeredSequenceID);
-
internalSequenceBean.setValue(outgoingSideInternalSequenceId);
- seqPropMgr.insert(internalSequenceBean);
+ if (offerEcepted) {
+ // Setting the CreateSequence table entry for
the outgoing
+ // side.
+ RMSBean rMSBean = new RMSBean();
+ rMSBean.setSequenceID(offeredSequenceID);
+ String outgoingSideInternalSequenceId =
SandeshaUtil
+
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
+ // this is a dummy value
+
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
- Endpoint endpoint = offer.getEndpoint();
- if (endpoint!=null) {
- // setting the OfferedEndpoint
- SequencePropertyBean
offeredEndpointBean = new SequencePropertyBean();
-
offeredEndpointBean.setName(Sandesha2Constants.SequenceProperties.OFFERED_ENDPOINT);
+ rMSBean.setToEPR(rmdBean.getToEPR());
+ rMSBean.setAcksToEPR(rmdBean.getAcksToEPR());
+ rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
+
rMSBean.setLastActivatedTime(System.currentTimeMillis());
+
+ String outgoingSideSequencePropertyKey =
outgoingSideInternalSequenceId;
+
+ RMSBeanMgr rmsBeanMgr =
storageManager.getRMSBeanMgr();
+ rmsBeanMgr.insert(rMSBean);
+
+ // Setting sequence properties for the outgoing
sequence.
+ // Only will be used by the server side
response path. Will
+ // be wasted properties for the client side.
+
+ // setting the internal_sequence_id
+ SequencePropertyBean internalSequenceBean = new
SequencePropertyBean();
+
internalSequenceBean.setName(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+
internalSequenceBean.setSequencePropertyKey(offeredSequenceID);
+
internalSequenceBean.setValue(outgoingSideInternalSequenceId);
+ seqPropMgr.insert(internalSequenceBean);
- //currently we can only
serialize the Address part of the Endpoint.
- //TODO correct this to
serialize the whole EPR.
-
offeredEndpointBean.setValue(endpoint.getEPR().getAddress());
-
offeredEndpointBean.setSequencePropertyKey(outgoingSideSequencePropertyKey);
-
seqPropMgr.insert(offeredEndpointBean);
- }
+ Endpoint endpoint = offer.getEndpoint();
+ if (endpoint!=null) {
+ // setting the OfferedEndpoint
+ SequencePropertyBean
offeredEndpointBean = new SequencePropertyBean();
+
offeredEndpointBean.setName(Sandesha2Constants.SequenceProperties.OFFERED_ENDPOINT);
+
+ //currently we can only serialize the
Address part of the Endpoint.
+ //TODO correct this to serialize the
whole EPR.
+
offeredEndpointBean.setValue(endpoint.getEPR().getAddress());
+
offeredEndpointBean.setSequencePropertyKey(outgoingSideSequencePropertyKey);
+ seqPropMgr.insert(offeredEndpointBean);
+ }
- // Store the inbound token (if any)
with the new sequence
- if(token != null) {
- String tokenData =
secManager.getTokenRecoveryData(token);
- SequencePropertyBean tokenBean
= new SequencePropertyBean(
-
outgoingSideSequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
- tokenData);
- seqPropMgr.insert(tokenBean);
- }
- } else {
- // removing the accept part.
- createSeqResPart.setAccept(null);
- createSeqResponse.addSOAPEnvelope();
+ // Store the inbound token (if any) with the
new sequence
+ if(token != null) {
+ String tokenData =
secManager.getTokenRecoveryData(token);
+ SequencePropertyBean tokenBean = new
SequencePropertyBean(
+
outgoingSideSequencePropertyKey,
+
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
+ tokenData);
+ seqPropMgr.insert(tokenBean);
}
+ } else {
+ // removing the accept part.
+ createSeqResPart.setAccept(null);
+ createSeqResponse.addSOAPEnvelope();
}
+ }
// Add this sequence to the list of inbound sequences
- SequencePropertyBean incomingSequenceListBean =
seqPropMgr.retrieve(
-
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- incomingSequenceListBean = new
SequencePropertyBean();
-
incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
-
incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
- incomingSequenceListBean.setValue(null);
-
- // this get inserted before
- seqPropMgr.insert(incomingSequenceListBean);
- }
+ SequencePropertyBean incomingSequenceListBean =
seqPropMgr.retrieve(
+
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ incomingSequenceListBean = new SequencePropertyBean();
+
incomingSequenceListBean.setSequencePropertyKey(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+
incomingSequenceListBean.setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ incomingSequenceListBean.setValue(null);
- ArrayList incomingSequenceList =
SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
- incomingSequenceList.add(rmdBean.getSequenceID());
-
incomingSequenceListBean.setValue(incomingSequenceList.toString());
- seqPropMgr.update(incomingSequenceListBean);
+ // this get inserted before
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
+ ArrayList incomingSequenceList =
SandeshaUtil.getArrayListFromString(incomingSequenceListBean.getValue());
+ incomingSequenceList.add(rmdBean.getSequenceID());
+
incomingSequenceListBean.setValue(incomingSequenceList.toString());
+ seqPropMgr.update(incomingSequenceListBean);
- //TODO add createSequenceResponse message as the
referenceMessage to the RMDBean.
-
- outMessage.setResponseWritten(true);
+ //TODO add createSequenceResponse message as the
referenceMessage to the RMDBean.
- // commiting tr. before sending the response msg.
+ outMessage.setResponseWritten(true);
-
SequenceManager.updateLastActivatedTime(rmdBean.getSequenceID(),
storageManager);
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
+ storageManager.getRMDBeanMgr().update(rmdBean);
- AxisEngine engine = new AxisEngine(context);
- try{
- engine.send(outMessage);
- }
- catch(AxisFault e){
- throw new SandeshaException(
-
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCreateSeqResponse,
e.toString()),
- e);
- }
+ AxisEngine engine = new AxisEngine(context);
+ try{
+ engine.send(outMessage);
+ }
+ catch(AxisFault e){
+ throw new SandeshaException(
+
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCreateSeqResponse,
e.toString()),
+ e);
+ }
- boolean anon = true;
- if (rmdBean.getToEPR() != null) {
- EndpointReference toEPR = new
EndpointReference(rmdBean.getToEPR());
- if (!toEPR.hasAnonymousAddress()) anon = false;
- }
- if(anon) {
-
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
"true");
- } else {
+ boolean anon = true;
+ if (rmdBean.getToEPR() != null) {
+ EndpointReference toEPR = new
EndpointReference(rmdBean.getToEPR());
+ if (!toEPR.hasAnonymousAddress()) anon = false;
+ }
+ if(anon) {
+
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
"true");
+ } else {
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
"false");
- }
-
- } catch (AxisFault e1) {
- throw new SandeshaException(e1);
}
-
+
createSeqRMMsg.pause();
if (log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Fri Jan 12 07:45:34 2007
@@ -48,7 +48,6 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.AckRequested;
@@ -110,9 +109,9 @@
}
SenderBeanMgr retransmitterMgr =
storageManager.getSenderBeanMgr();
- RMSBeanMgr createSeqMgr = storageManager.getRMSBeanMgr();
+ RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
- RMSBean rmsBean = createSeqMgr.retrieve(createSeqMsgId);
+ RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
if (rmsBean == null) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound);
log.debug(message);
@@ -231,7 +230,8 @@
RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
rmdBeanMgr.insert(rMDBean);
- createSeqMgr.update(rmsBean);
+
rmsBean.setLastActivatedTime(System.currentTimeMillis());
+ rmsBeanMgr.update(rmsBean);
// Store the security token for the offered sequence
if(tokenData != null) {
@@ -365,8 +365,6 @@
// updating the message. this will correct the SOAP
envelope string.
storageManager.updateMessageContext(key,
applicationMsg);
}
-
- SequenceManager.updateLastActivatedTime(sequencePropertyKey,
storageManager);
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
org.apache.axis2.Constants.RESPONSE_WRITTEN,
"false");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Fri Jan 12 07:45:34 2007
@@ -55,7 +55,6 @@
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -138,9 +137,6 @@
FaultManager.checkForSequenceClosed(rmMsgCtx, sequenceId,
storageManager);
FaultManager.checkForLastMsgNumberExceeded(rmMsgCtx,
storageManager);
- // updating the last activated time of the sequence.
- SequenceManager.updateLastActivatedTime(propertyKey,
storageManager);
-
long msgNo = sequence.getMessageNumber().getMessageNumber();
if (msgNo == 0) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
@@ -157,6 +153,9 @@
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotFindSequence,
sequenceId));
}
+
+ // updating the last activated time of the sequence.
+ bean.setLastActivatedTime(System.currentTimeMillis());
String key = SandeshaUtil.getUUID(); // key to store the
message.
// updating the Highest_In_Msg_No property which gives the
highest
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Fri Jan 12 07:45:34 2007
@@ -48,7 +48,6 @@
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.util.WSRMMessageSender;
@@ -104,13 +103,6 @@
FaultManager.checkForUnknownSequence(terminateSeqRMMsg,
sequenceId, storageManager);
- SequencePropertyBean terminateReceivedBean = new
SequencePropertyBean();
-
terminateReceivedBean.setSequencePropertyKey(sequencePropertyKey);
-
terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED);
- terminateReceivedBean.setValue("true");
-
- sequencePropertyBeanMgr.insert(terminateReceivedBean);
-
// add the terminate sequence response if required.
RMMsgContext terminateSequenceResponse = null;
if
(SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
@@ -120,12 +112,11 @@
RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
rmdBean.setTerminated(true);
+ rmdBean.setLastActivatedTime(System.currentTimeMillis());
storageManager.getRMDBeanMgr().update(rmdBean);
TerminateManager.cleanReceivingSideOnTerminateMessage(context,
sequencePropertyKey, sequenceId, storageManager);
- SequenceManager.updateLastActivatedTime(sequencePropertyKey,
storageManager);
-
//sending the terminate sequence response
if (terminateSequenceResponse != null) {
@@ -134,11 +125,9 @@
AxisEngine engine = new AxisEngine(terminateSeqMsg
.getConfigurationContext());
-
-
+
outMessage.setServerSide(true);
-
-
+
engine.send(outMessage);
if (toEPR.hasAnonymousAddress()) {
@@ -164,8 +153,6 @@
if (log.isDebugEnabled())
log.debug("Enter:
TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
- SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
-
RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
RMDBean bean = mgr.retrieve(sequenceId);
@@ -187,9 +174,10 @@
// Mark up the highest inbound message as if it
had the last message flag on it.
//
String inMsgId = bean.getHighestInMessageId();
- SequencePropertyBean lastInMsgBean = new
SequencePropertyBean(requestSidesequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_IN_MSG_ID,
bean.getHighestInMessageId());
- seqPropMgr.insert(lastInMsgBean);
+ bean.setLastInMessageId(inMsgId);
+
+ // Update the RMDBean
+ storageManager.getRMDBeanMgr().update(bean);
// If an outbound message has already gone out
with that relatesTo, then we can terminate
// right away.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
Fri Jan 12 07:45:34 2007
@@ -42,11 +42,26 @@
private long highestInMessageNumber = 0;
private String highestInMessageId;
-
- /** For incoming sequences this gives the msg no's of the messages that
were
+
+ /**
+ * Once an inbound sequence is closed, or we receive a message with the
+ * 'LastMessage' marker, we record the message id of the highest message
+ * in the sequence.
+ */
+ private String lastInMessageId;
+
+ /**
+ * For incoming sequences this gives the msg no's of the messages that
were
* received (may be an ack was sent - depending on the policy)
*/
private List serverCompletedMessages = null;
+
+ /**
+ * For IN_ORDER sequences, we can have finite ranges of messages that
can be
+ * delivered out of order. These are maintained as a String that is
consistent
+ * with the form described in org.apache.sandesha2.util.RangeString
+ */
+ private String outOfOrderRanges = null;
public RMDBean() {
@@ -95,6 +110,30 @@
public void setHighestInMessageNumber(long highestInMessageNumber) {
this.highestInMessageNumber = highestInMessageNumber;
}
+
+ public List getServerCompletedMessages() {
+ return serverCompletedMessages;
+ }
+
+ public void setServerCompletedMessages(List serverCompletedMessages) {
+ this.serverCompletedMessages = serverCompletedMessages;
+ }
+
+ public String getLastInMessageId() {
+ return lastInMessageId;
+ }
+
+ public void setLastInMessageId(String lastInMessageId) {
+ this.lastInMessageId = lastInMessageId;
+ }
+
+ public String getOutOfOrderRanges() {
+ return outOfOrderRanges;
+ }
+
+ public void setOutOfOrderRanges(String outOfOrderRanges) {
+ this.outOfOrderRanges = outOfOrderRanges;
+ }
public String toString() {
StringBuffer result = new StringBuffer();
@@ -104,15 +143,8 @@
result.append("\nRef Msg Key: ");
result.append(referenceMessageKey);
result.append("\nHishestInMessageNumber: ");
result.append(highestInMessageNumber);
result.append("\nHishestInMessageKey: ");
result.append(highestInMessageId);
+ result.append("\nLastInMessageId: ");
result.append(lastInMessageId);
+ result.append("\nOutOfOrderRanges :");
result.append(outOfOrderRanges);
return result.toString();
}
-
- public List getServerCompletedMessages() {
- return serverCompletedMessages;
- }
-
- public void setServerCompletedMessages(List serverCompletedMessages) {
- this.serverCompletedMessages = serverCompletedMessages;
- }
-
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
Fri Jan 12 07:45:34 2007
@@ -105,6 +105,18 @@
* Indicates that a terminate sequence message was added.
*/
private boolean terminateAdded = false;
+
+ /**
+ * The number of messages that were acked
+ */
+ private long numberOfMessagesAcked = 0;
+
+ /**
+ * Indicates the client has sent a close sequence
+ */
+ private boolean sequenceClosedClient = false;
+
+ private String transportTo;
public RMSBean() {
}
@@ -142,7 +154,6 @@
this.createSequenceMsgStoreKey = createSequenceMsgStoreKey;
}
-
public String getReferenceMessageStoreKey() {
return referenceMessageStoreKey;
}
@@ -159,16 +170,13 @@
this.lastSendError = lastSendError;
}
-
public long getLastSendErrorTimestamp() {
return lastSendErrorTimestamp;
}
-
public void setLastSendErrorTimestamp(long lastSendErrorTimestamp) {
this.lastSendErrorTimestamp = lastSendErrorTimestamp;
}
-
public long getLastOutMessage() {
return lastOutMessage;
@@ -182,17 +190,14 @@
return highestOutMessageNumber;
}
-
public void setHighestOutMessageNumber(long highestOutMessageNumber) {
this.highestOutMessageNumber = highestOutMessageNumber;
}
-
public String getHighestOutRelatesTo() {
return highestOutRelatesTo;
}
-
public void setHighestOutRelatesTo(String highestOutRelatesTo) {
this.highestOutRelatesTo = highestOutRelatesTo;
}
@@ -201,7 +206,6 @@
return nextMessageNumber;
}
-
public void setNextMessageNumber(long nextMessageNumber) {
this.nextMessageNumber = nextMessageNumber;
}
@@ -210,21 +214,42 @@
return clientCompletedMessages;
}
-
public void setClientCompletedMessages(List clientCompletedMessages) {
this.clientCompletedMessages = clientCompletedMessages;
}
-
public boolean isTerminateAdded() {
return terminateAdded;
}
-
public void setTerminateAdded(boolean terminateAdded) {
this.terminateAdded = terminateAdded;
}
+
+ public boolean isSequenceClosedClient() {
+ return sequenceClosedClient;
+ }
+
+ public void setSequenceClosedClient(boolean sequenceClosedClient) {
+ this.sequenceClosedClient = sequenceClosedClient;
+ }
+
+ public long getNumberOfMessagesAcked() {
+ return numberOfMessagesAcked;
+ }
+
+ public void setNumberOfMessagesAcked(long numberOfMessagesAcked) {
+ this.numberOfMessagesAcked = numberOfMessagesAcked;
+ }
+ public String getTransportTo() {
+ return transportTo;
+ }
+
+ public void setTransportTo(String transportTo) {
+ this.transportTo = transportTo;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -239,7 +264,9 @@
result.append("\nHighestOutRelatesTo:
");result.append(highestOutRelatesTo);
result.append("\nNextMessageNumber: ");
result.append(nextMessageNumber);
result.append("\nTerminateAdded : ");
result.append(terminateAdded);
+ result.append("\nClosedClient : ");
result.append(sequenceClosedClient);
+ result.append("\nNumAckedMsgs : ");
result.append(numberOfMessagesAcked);
+ result.append("\nTransportTo : ");
result.append(transportTo);
return result.toString();
}
-
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSequenceBean.java
Fri Jan 12 07:45:34 2007
@@ -36,6 +36,8 @@
private String replyToEPR;
private String acksToEPR;
+
+ private long lastActivatedTime;
/**
* Indicates that a sequence is terminated
@@ -108,11 +110,18 @@
return terminated;
}
-
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
+ public long getLastActivatedTime() {
+ return lastActivatedTime;
+ }
+
+ public void setLastActivatedTime(long lastActivatedTime) {
+ this.lastActivatedTime = lastActivatedTime;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append("\nSequence Id: "); result.append(sequenceID);
@@ -121,6 +130,7 @@
result.append("\nacksToEPR : "); result.append(acksToEPR);
result.append("\nPolling : "); result.append(pollingMode);
result.append("\nTerminated : ");
result.append(terminated);
+ result.append("\nLastActivatedTime: ");
result.append(lastActivatedTime);
return result.toString();
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Fri Jan 12 07:45:34 2007
@@ -269,12 +269,7 @@
// saving transportTo value;
String transportTo = (String)
firstAplicationMsgCtx.getProperty(Constants.Configuration.TRANSPORT_URL);
if (transportTo != null) {
- SequencePropertyBean transportToBean = new
SequencePropertyBean();
-
transportToBean.setSequencePropertyKey(sequencePropertyKey);
-
transportToBean.setName(Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
- transportToBean.setValue(transportTo);
-
- seqPropMgr.insert(transportToBean);
+ rmsBean.setTransportTo(transportTo);
}
// setting the spec version for the client side.
@@ -285,7 +280,7 @@
seqPropMgr.insert(specVerionBean);
// updating the last activated time.
- updateLastActivatedTime(sequencePropertyKey, storageManager);
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
SandeshaUtil.startSenderForTheSequence(configurationContext,
sequencePropertyKey);
@@ -363,58 +358,7 @@
}
- /**
- * Takes the internalSeqID as the param. Not the sequenceID.
- *
- * @param internalSequenceID
- * @param configContext
- * @throws SandeshaException
- */
- public static void updateLastActivatedTime(String sequencePropertyKey,
StorageManager storageManager)
- throws SandeshaException {
- // Transaction lastActivatedTransaction =
- // storageManager.getTransaction();
- SequencePropertyBeanMgr sequencePropertyBeanMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean lastActivatedBean =
sequencePropertyBeanMgr.retrieve(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
-
- boolean added = false;
-
- if (lastActivatedBean == null) {
- added = true;
- lastActivatedBean = new SequencePropertyBean();
-
lastActivatedBean.setSequencePropertyKey(sequencePropertyKey);
-
lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
- }
-
- long currentTime = System.currentTimeMillis();
- lastActivatedBean.setValue(Long.toString(currentTime));
-
- if (added)
- sequencePropertyBeanMgr.insert(lastActivatedBean);
- else
- sequencePropertyBeanMgr.update(lastActivatedBean);
-
- }
-
- public static long getLastActivatedTime(String propertyKey,
StorageManager storageManager) throws SandeshaException {
-
- SequencePropertyBeanMgr seqPropBeanMgr =
storageManager.getSequencePropertyBeanMgr();
-
- SequencePropertyBean lastActivatedBean =
seqPropBeanMgr.retrieve(propertyKey,
-
Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
-
- long lastActivatedTime = -1;
-
- if (lastActivatedBean != null) {
- lastActivatedTime =
Long.parseLong(lastActivatedBean.getValue());
- }
-
- return lastActivatedTime;
- }
-
- public static boolean hasSequenceTimedOut(String propertyKey,
RMMsgContext rmMsgCtx, StorageManager storageManager)
+ public static boolean hasSequenceTimedOut(String internalSequenceId,
RMMsgContext rmMsgCtx, StorageManager storageManager)
throws SandeshaException {
// operation is the lowest level, Sandesha2 could be engaged.
@@ -426,11 +370,14 @@
boolean sequenceTimedOut = false;
- long lastActivatedTime = getLastActivatedTime(propertyKey,
storageManager);
- long timeNow = System.currentTimeMillis();
- if (lastActivatedTime > 0 && (lastActivatedTime +
propertyBean.getInactivityTimeoutInterval() < timeNow))
- sequenceTimedOut = true;
-
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceId);
+
+ if (rmsBean != null) {
+ long lastActivatedTime = rmsBean.getLastActivatedTime();
+ long timeNow = System.currentTimeMillis();
+ if (lastActivatedTime > 0 && (lastActivatedTime +
propertyBean.getInactivityTimeoutInterval() < timeNow))
+ sequenceTimedOut = true;
+ }
return sequenceTimedOut;
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
Fri Jan 12 07:45:34 2007
@@ -229,9 +229,6 @@
private static boolean isPropertyDeletable(String name) {
boolean deleatable = true;
- if
(Sandesha2Constants.SequenceProperties.NO_OF_OUTGOING_MSGS_ACKED.equals(name))
- deleatable = false;
-
if
(Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name))
deleatable = false;
@@ -355,10 +352,8 @@
terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
- SequencePropertyBean transportToBean =
seqPropMgr.retrieve(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
- if (transportToBean != null) {
-
terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL,
transportToBean.getValue());
+ if (rmsBean.getTransportTo() != null) {
+
terminateRMMessage.setProperty(Constants.Configuration.TRANSPORT_URL,
rmsBean.getTransportTo());
}
terminateRMMessage.addSOAPEnvelope();
@@ -366,6 +361,8 @@
String key = SandeshaUtil.getUUID();
SenderBean terminateBean = new SenderBean();
+ terminateBean.setInternalSequenceID(internalSequenceID);
+ terminateBean.setSequenceID(outSequenceId);
terminateBean.setMessageContextRefKey(key);
// Set a retransmitter lastSentTime so that terminate will be
send with
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
Fri Jan 12 07:45:34 2007
@@ -50,6 +50,7 @@
private boolean sequenceExists;
private String outSequenceID;
private String rmVersion;
+ private RMSBean rmsBean;
/**
* Extracts information from the rmMsgCtx specific for processing out
messages
@@ -76,13 +77,10 @@
sequenceExists = false;
outSequenceID = null;
- // Get the Create sequence bean with the matching internal
sequenceid
- RMSBean createSeqFindBean = new RMSBean();
- createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
- RMSBean rMSBean =
storageManager.getRMSBeanMgr().findUnique(createSeqFindBean);
+ // Get the RMSBean with the matching internal sequenceid
+ rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
internalSequenceID);
- if (rMSBean == null)
+ if (rmsBean == null)
{
if (log.isDebugEnabled())
log.debug("Exit:
WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
@@ -91,10 +89,10 @@
SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));
}
- if (rMSBean.getSequenceID() != null)
+ if (rmsBean.getSequenceID() != null)
{
sequenceExists = true;
- outSequenceID = rMSBean.getSequenceID();
+ outSequenceID = rmsBean.getSequenceID();
}
else
outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;
@@ -117,8 +115,7 @@
rmMsgCtx.setTo(new EndpointReference(toAddress));
- String transportTo =
SandeshaUtil.getSequenceProperty(internalSequenceID,
-
Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+ String transportTo = rmsBean.getTransportTo();
if (transportTo != null) {
rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
}
@@ -213,5 +210,9 @@
public final String getRMVersion() {
return rmVersion;
}
+
+ public final RMSBean getRMSBean() {
+ return rmsBean;
+ }
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
Fri Jan 12 07:45:34 2007
@@ -76,8 +76,8 @@
InvokerBeanMgr storageMapMgr = storageManager
.getInvokerBeanMgr();
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
- RMDBean rMDBean = nextMsgMgr.retrieve(sequenceID);
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rmdBeanMgr.retrieve(sequenceID);
if (rMDBean != null) {
@@ -124,7 +124,6 @@
if(msgNumber>highestMsgNumberInvoked){
highestMsgNumberInvoked
= invoker.getMsgNo();
rMDBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
-
nextMsgMgr.update(rMDBean);
if(allowLaterDeliveryOfMissingMessages){
//we also need
to update the sequence OUT_OF_ORDER_RANGES property
@@ -134,26 +133,19 @@
Range r = new
Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
RangeString
rangeString = null;
-
SequencePropertyBeanMgr seqPropertyManager =
storageManager.getSequencePropertyBeanMgr();
-
SequencePropertyBean outOfOrderRanges =
-
seqPropertyManager.retrieve(sequenceID,
Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
-
if(outOfOrderRanges==null){
+
if(rMDBean.getOutOfOrderRanges()==null){
//insert a new blank one one
-
outOfOrderRanges = new SequencePropertyBean(sequenceID,
-
Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES,
-
"");
-
-
seqPropertyManager.insert(outOfOrderRanges);
rangeString = new RangeString("");
}
else{
-
rangeString = new RangeString(outOfOrderRanges.getValue());
+
rangeString = new RangeString(rMDBean.getOutOfOrderRanges());
}
//update the
range String with the new value
rangeString.addRange(r);
-
outOfOrderRanges.setValue(rangeString.toString());
-
seqPropertyManager.update(outOfOrderRanges);
+
rMDBean.setOutOfOrderRanges(rangeString.toString());
}
+
+
rmdBeanMgr.update(rMDBean);
}
}
@@ -179,21 +171,19 @@
}
private void addOutOfOrderInvokerBeansToList(String sequenceID,
- StorageManager strMgr, List list)throws
SandeshaException{
+ StorageManager storageManager, List list)throws
SandeshaException{
if (log.isDebugEnabled())
- log.debug("Enter:
InOrderInvoker::addOutOfOrderInvokerBeansToList");
+ log.debug("Enter:
InOrderInvoker::addOutOfOrderInvokerBeansToList " + sequenceID + ", " + list);
- SequencePropertyBeanMgr seqPropertyManager =
strMgr.getSequencePropertyBeanMgr();
+ RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
- SequencePropertyBean outOfOrderRanges =
- seqPropertyManager.retrieve(sequenceID,
Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
- if(outOfOrderRanges!=null){
- String sequenceRanges = outOfOrderRanges.getValue();
+ if(rmdBean != null && rmdBean.getOutOfOrderRanges() != null){
+ String sequenceRanges = rmdBean.getOutOfOrderRanges();
RangeString rangeString = new
RangeString(sequenceRanges);
//we now have the set of ranges that can be delivered
out of order.
//Look for any invokable message that lies in one of
those ranges
Iterator invokerBeansIterator =
- strMgr.getInvokerBeanMgr().find(
+ storageManager.getInvokerBeanMgr().find(
new InvokerBean(null,
0, //finds all invoker beans
sequenceID)).iterator();
Modified:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java?view=diff&rev=495613&r1=495612&r2=495613
==============================================================================
---
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
(original)
+++
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/workers/ForceInboundDispatchTest.java
Fri Jan 12 07:45:34 2007
@@ -9,7 +9,6 @@
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
-import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SandeshaTestCase;
import org.apache.sandesha2.client.SandeshaClient;
@@ -19,7 +18,6 @@
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
@@ -84,14 +82,9 @@
assertNotNull(rMDBean);
assertEquals(rMDBean.getNextMsgNoToProcess(), 4);
- //also check that the sequence has an out of order gap
that contains msg 2
- SequencePropertyBean outOfOrderRanges =
-
serverStore.getSequencePropertyBeanMgr().retrieve(
- inboundSequenceID,
-
Sandesha2Constants.SequenceProperties.OUT_OF_ORDER_RANGES);
-
- assertNotNull(outOfOrderRanges);
- RangeString rangeString = new
RangeString(outOfOrderRanges.getValue());
+ //also check that the sequence has an out of order gap
that contains msg 2
+ assertNotNull(rMDBean.getOutOfOrderRanges());
+ RangeString rangeString = new
RangeString(rMDBean.getOutOfOrderRanges());
assertTrue(rangeString.isMessageNumberInRanges(2));
t.commit();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]