Author: mlovett
Date: Wed Feb 28 03:22:57 2007
New Revision: 512708
URL: http://svn.apache.org/viewvc?view=rev&rev=512708
Log:
Put the uuid associated with an anonymous endpoint into the RMSBean, instead of
on the Service Context
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Wed Feb 28 03:22:57 2007
@@ -511,8 +511,6 @@
static final String RETRANSMITTABLE_PHASES = "RMRetransmittablePhases";
- static final String RM_ANON_UUID = "RMAnonymousUUID";
-
static final String propertiesToCopyFromReferenceMessage =
"propertiesToCopyFromReferenceMessage";
static final String propertiesToCopyFromReferenceRequestMessage =
"propertiesToCopyFromReferenceRequestMessage";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Wed Feb 28 03:22:57 2007
@@ -86,18 +86,6 @@
MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configContext =
msgContext.getConfigurationContext();
- // Re-write the WS-A anonymous URI, if we support the RM
anonymous URI. We only
- // need to rewrite the replyTo EPR if we have an out-in MEP.
- AxisOperation op = msgContext.getAxisOperation();
- int mep = WSDLConstants.MEP_CONSTANT_INVALID;
- if(op != null) {
- mep = op.getAxisSpecifMEPConstant();
- if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
- EndpointReference replyTo =
SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
- msgContext.setReplyTo(replyTo);
- }
- }
-
// setting the Fault callback
SandeshaListener faultCallback = (SandeshaListener)
msgContext.getOptions().getProperty(
SandeshaClientConstants.SANDESHA_LISTENER);
@@ -273,11 +261,36 @@
rmsBean.setHighestOutMessageNumber(messageNumber);
// saving the used message number, and the expected reply count
+ boolean startPolling = false;
if (!dummyMessage) {
rmsBean.setNextMessageNumber(messageNumber);
+
+ // Identify the MEP associated with the message.
+ AxisOperation op = msgContext.getAxisOperation();
+ int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+ if(op != null) {
+ mep = op.getAxisSpecifMEPConstant();
+ }
+
if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
long expectedReplies =
rmsBean.getExpectedReplies();
rmsBean.setExpectedReplies(expectedReplies + 1);
+
+ // If we support the RM anonymous URI then
rewrite the ws-a anon to use the RM equivalent.
+ EndpointReference oldEndpoint =
msgContext.getReplyTo();
+ String oldAddress = (oldEndpoint == null) ?
null : oldEndpoint.getAddress();
+ EndpointReference newReplyTo =
SandeshaUtil.rewriteEPR(rmsBean, msgContext.getReplyTo(), configContext);
+ String newAddress = (newReplyTo == null) ? null
: newReplyTo.getAddress();
+ if(newAddress != null &&
!newAddress.equals(oldAddress)) {
+ // We have rewritten the replyTo. If
this is the first message that we have needed to
+ // rewrite then we should set the
sequence up for polling, and once we have saved the
+ // changes to the sequence then we can
start the polling thread.
+ msgContext.setReplyTo(newReplyTo);
+ if(!rmsBean.isPollingMode()) {
+ rmsBean.setPollingMode(true);
+ startPolling = true;
+ }
+ }
}
}
@@ -295,6 +308,10 @@
// Update the rmsBean
storageManager.getRMSBeanMgr().update(rmsBean);
+
+ if(startPolling) {
+
SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(),
rmsBean);
+ }
SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
if (env == null) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
Wed Feb 28 03:22:57 2007
@@ -140,7 +140,7 @@
boolean cleanAcks =
AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(),
beanToPoll.getNextMessageNumber());
long repliesExpected = beanToPoll.getExpectedReplies();
if(force || !cleanAcks || repliesExpected > 0)
- pollForSequence(beanToPoll.getSequenceID(),
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(),
beanToPoll, entry);
+ pollForSequence(beanToPoll.getAnonymousUUID(),
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(),
beanToPoll, entry);
}
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMSSide");
@@ -174,31 +174,35 @@
}
}
if(force || doPoll)
- pollForSequence(nextMsgBean.getSequenceID(),
nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean,
entry);
+ pollForSequence(null, null,
nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
}
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMDSide");
}
- private void pollForSequence(String sequenceId,
- String
sequencePropertyKey,
+ private void pollForSequence(String anonUUID, // Only set for RMS
polling
+ String
internalSeqId, // Only set for RMS polling
String
referenceMsgKey,
RMSequenceBean
rmBean,
SequenceEntry
entry)
throws SandeshaException, SandeshaStorageException, AxisFault
{
- if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey +
", " + referenceMsgKey + ", " + rmBean);
+ if(log.isDebugEnabled()) log.debug("Enter:
PollingManager::pollForSequence, rmBean: " + rmBean);
//create a MakeConnection message
String replyTo = rmBean.getReplyToEPR();
String wireSeqId = null;
String wireAddress = null;
- if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
+ if(anonUUID != null) {
+ // If we are polling on a RM anon URI then we don't
want to include the sequence id
+ // in the MakeConnection message.
+ wireAddress = anonUUID;
+ } else if(SandeshaUtil.isWSRMAnonymous(replyTo)) {
// If we are polling on a RM anon URI then we don't
want to include the sequence id
// in the MakeConnection message.
wireAddress = replyTo;
} else {
- wireSeqId = sequenceId;
+ wireSeqId = rmBean.getSequenceID();
}
MessageContext referenceMessage =
storageManager.retrieveMessageContext(referenceMsgKey,context);
@@ -218,7 +222,7 @@
//add an entry for the MakeConnection message to the sender
(with ,send=true, resend=false)
SenderBean makeConnectionSenderBean = new SenderBean ();
- makeConnectionSenderBean.setInternalSequenceID((rmBean
instanceof RMSBean) ? sequencePropertyKey : null); // We only have internal ids
for the RMS-side
+ makeConnectionSenderBean.setInternalSequenceID(internalSeqId);
makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
Wed Feb 28 03:22:57 2007
@@ -80,6 +80,8 @@
private String offeredSequence = null;
+ private String anonymousUUID = null;
+
/**
* This is the timestamp of when the last error occured when sending
*/
@@ -325,6 +327,14 @@
this.rmsFlags |= EXPECTED_REPLIES;
}
+ public String getAnonymousUUID() {
+ return anonymousUUID;
+ }
+
+ public void setAnonymousUUID(String anonymousUUID) {
+ this.anonymousUUID = anonymousUUID;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -351,6 +361,7 @@
result.append("\nLastErrorTime : ");
result.append(lastSendErrorTimestamp);
}
result.append("\nClientCompletedMsgs: ");
result.append(clientCompletedMessages);
+ result.append("\nAnonymous UUID : ");
result.append(anonymousUUID);
return result.toString();
}
@@ -392,6 +403,9 @@
else if(bean.getOfferedSequence() != null &&
!bean.getOfferedSequence().equals(this.getOfferedSequence()))
match = false;
+ else if(bean.getAnonymousUUID() != null &&
!bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
+ match = false;
+
// Avoid matching on the error information
// else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 &&
bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
// match = false;
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Wed Feb 28 03:22:57 2007
@@ -947,13 +947,12 @@
return stackTrace;
}
- public static EndpointReference rewriteEPR(EndpointReference epr,
MessageContext mc)
+ public static EndpointReference rewriteEPR(RMSBean sourceBean,
EndpointReference epr, ConfigurationContext configContext)
throws SandeshaException
{
if (log.isDebugEnabled())
log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
- ConfigurationContext configContext =
mc.getConfigurationContext();
SandeshaPolicyBean policy =
SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
if(!policy.isEnableRMAnonURI()) {
if (log.isDebugEnabled())
@@ -969,24 +968,16 @@
if(address == null ||
AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address)
||
AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
- // We use the service context to co-ordinate the RM
anon uuid, so that several
- // invocations of the same target will yield stable
replyTo addresses.
- String uuid = null;
- ServiceContext sc = mc.getServiceContext();
- if(sc == null) {
- String msg =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
- throw new SandeshaException(msg);
- }
- synchronized (sc) {
- uuid = (String)
sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
- if(uuid == null) {
- uuid = SandeshaUtil.getUUID();
-
sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
- }
+ // We use the sequence to hold the anonymous uuid, so
that messages assigned to the
+ // sequence will use the same UUID to identify
themselves
+ String uuid = sourceBean.getAnonymousUUID();
+ if(uuid == null) {
+ uuid =
Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + SandeshaUtil.getUUID();
+ sourceBean.setAnonymousUUID(uuid);
}
- if(log.isDebugEnabled()) log.debug("Rewriting EPR with
UUID " + uuid);
-
epr.setAddress(Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + uuid);
+ if(log.isDebugEnabled()) log.debug("Rewriting EPR with
anon URI " + uuid);
+ epr.setAddress(uuid);
}
if (log.isDebugEnabled())
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Wed Feb 28 03:22:57 2007
@@ -20,6 +20,7 @@
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.commons.logging.Log;
@@ -253,8 +254,9 @@
}
}
// In case either of the replyTo or AcksTo is anonymous,
rewrite them using the AnonURI template
- replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR,
firstAplicationMsgCtx);
- acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR,
firstAplicationMsgCtx);
+ ConfigurationContext config =
firstAplicationMsgCtx.getConfigurationContext();
+ replyToEPR = SandeshaUtil.rewriteEPR(rmsBean, replyToEPR,
config);
+ acksToEPR = SandeshaUtil.rewriteEPR(rmsBean, acksToEPR, config);
// Store both the acksTo and replyTo
if(replyToEPR != null)
rmsBean.setReplyToEPR(replyToEPR.getAddress());
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]