Author: mlovett
Date: Tue Jan 30 10:23:44 2007
New Revision: 501504
URL: http://svn.apache.org/viewvc?view=rev&rev=501504
Log:
Fix some sync-2-way errors that stopped the backchannel working with WSRM 1.1
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=501504&r1=501503&r2=501504
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Tue Jan 30 10:23:44 2007
@@ -169,20 +169,12 @@
String specVersion = rmMsgCtx.getRMSpecVersion();
Boolean duplicateMessage = (Boolean)
rmMsgCtx.getProperty(Sandesha2Constants.DUPLICATE_MESSAGE);
String mep =
msgCtx.getAxisOperation().getMessageExchangePattern();
+ boolean syncReply = replyTo == null ||
replyTo.hasAnonymousAddress();
- if ((replyTo!=null && replyTo.hasAnonymousAddress()) &&
+ if (syncReply &&
!WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep) &&
(specVersion!=null &&
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0)) &&
(duplicateMessage!=null &&
duplicateMessage.equals(Boolean.TRUE))){
- if (WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep)) {
- //This scenario has to be handled only for meps
with response messages
- result = InvocationResponse.SUSPEND;
-
- if (log.isDebugEnabled())
- log.debug("Exit:
SequenceProcessor::processReliableMessage" + result);
- return result;
- }
-
String outgoingSideInternalSequenceId =
SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
outgoingSideInternalSequenceId);
if (rmsBean==null) {
@@ -243,11 +235,10 @@
// pausing the thread, causing
the transport to wait.
// Sender will send the
outMessage correctly, using
//
RequestResponseTransportListner.
-
- if (log.isDebugEnabled())
- log.debug("Exit:
SequenceProcessor::processReliableMessage" + result);
result =
InvocationResponse.SUSPEND;
+ if (log.isDebugEnabled())
+ log.debug("Exit:
SequenceProcessor::processReliableMessage" + result);
return result;
}
}
@@ -257,8 +248,11 @@
throw new SandeshaException (message);
} else if (duplicateMessage!=null &&
duplicateMessage.equals(Boolean.TRUE)) {
- String message = "Unexpected scenario. This message
should have been dropped in the pre-dispatch level";
- throw new SandeshaException (message);
+ // Abort processing this duplicate
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit:
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+ return result;
}
String key = SandeshaUtil.getUUID(); // key to store the
message.
@@ -290,8 +284,10 @@
&&
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE ==
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
// this is a duplicate message and the invocation type
is
// EXACTLY_ONCE.
- rmMsgCtx.pause();
- result = InvocationResponse.SUSPEND;
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit:
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+ return result;
}
if (!msgNoPresentInList)
@@ -311,9 +307,6 @@
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new
Long (msgNo));
-
-
-
// adding of acks
// if acksTo anonymous
@@ -352,47 +345,15 @@
if (inOrderInvocation) {
- //if replyTo is anonymous and this is not an InOnly
message
- //SUSPEND the execution for RM 1.0
- //Sender will attach a sync responseusing the
RequestResponseTransport object.
- //else
- //ABORT the execution
-
- // if (acksTo is anonymous and no response message has
been added)
- //send an ack to the back channel now.
-
- //add an antry to the invoker
-
- if ((replyTo!=null && replyTo.hasAnonymousAddress() &&
-
!WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep))) {
-
- if (specVersion!=null &&
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0)) {
- result = InvocationResponse.SUSPEND;
- //in case of RM 1.0 result will be
suspended, causing the anon-response to be added using RequestResponseTransport
- //ack bean entry added previously may
cause an ack to be piggybacked.
- } else {
- result = InvocationResponse.ABORT;
- }
- } else {
- result = InvocationResponse.ABORT;
- }
-
-
+ // Whatever the MEP, we suspend processing here and the
invoker will do the real work
+ result = InvocationResponse.SUSPEND;
InvokerBeanMgr storageMapMgr =
storageManager.getInvokerBeanMgr();
- // saving the message.
- try {
- storageManager.storeMessageContext(key,
rmMsgCtx.getMessageContext());
- storageMapMgr.insert(new InvokerBean(key,
msgNo, sequenceId));
-
- // This will avoid performing application
processing more
- // than
- // once.
-
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ storageManager.storeMessageContext(key,
rmMsgCtx.getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key, msgNo,
sequenceId));
- } catch (Exception ex) {
- throw new SandeshaException(ex.getMessage(),
ex);
- }
+ // This will avoid performing application processing
more than once.
+
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
// Starting the invoker if stopped.
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(),
sequenceId);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=501504&r1=501503&r2=501504
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Tue Jan 30 10:23:44 2007
@@ -115,42 +115,27 @@
return;
}
- String specVersion = rmMsgCtx.getRMSpecVersion();
-
- // If we are sending to the anonymous URI and this is
WSRM 1.0 then we _must_ have a transport waiting,
+ // If we are sending to the anonymous URI then we
_must_ have a transport waiting,
// or the message can't go anywhere. If there is
nothing here then we leave the
- // message in the sender queue, and a MakeConnection
will hopefully pick it up
- // soon.
- // If this is RM 1.1 we have the MakeConnection
mechanism so this check is not necessary.
+ // message in the sender queue, and a MakeConnection
(or a retransmitted request)
+ // will hopefully pick it up soon.
+ Boolean makeConnection = (Boolean)
msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
EndpointReference toEPR = msgCtx.getTo();
- if(toEPR.hasAnonymousAddress() ) {
-
- if (specVersion != null &&
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) {
- //for RM 1.0 we dont let this go
forward until a valid RequestResponseTransport is available to attach a
- //async response.
- RequestResponseTransport t = null;
- MessageContext inMsg = null;
- OperationContext op =
msgCtx.getOperationContext();
- if (op != null)
- inMsg =
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- if (inMsg != null)
- t = (RequestResponseTransport)
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ if(toEPR.hasAnonymousAddress() &&
+ (makeConnection == null ||
!makeConnection.booleanValue())) {
- if (t == null ||
!t.getStatus().equals(RequestResponseTransportStatus.WAITING)) {
- if (log.isDebugEnabled())
- log.debug("Exit:
SenderWorker::run, no response transport for anonymous message");
- return;
- }
- } else {
- //for RM 1.1 we dont let this go
forward only if this is an response to a MakeConnection message
-
- Boolean makeConnectionResponse =
(Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
-
- if (makeConnectionResponse==null ||
!Boolean.TRUE.equals(makeConnectionResponse)) {
- if (log.isDebugEnabled())
- log.debug("Exit:
SenderWorker::run, no response transport for anonymous message");
- return;
- }
+ RequestResponseTransport t = null;
+ MessageContext inMsg = null;
+ OperationContext op =
msgCtx.getOperationContext();
+ if (op != null)
+ inMsg =
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ if (inMsg != null)
+ t = (RequestResponseTransport)
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ if (t == null ||
!t.getStatus().equals(RequestResponseTransportStatus.WAITING)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit:
SenderWorker::run, no response transport for anonymous message");
+ return;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]