Author: chamikara
Date: Tue May 29 21:53:44 2007
New Revision: 542750
URL: http://svn.apache.org/viewvc?view=rev&rev=542750
Log:
A correction to set the messageType of empty body LastMessages correctly.
Added a new MessageType for 'Polling Response Messages'.
Made LastMessage Response and Polling Response messages OutOnly. So they will
go with OutOnly operations. this should be the case
since these does not relate to the request message.
Removed the duplicate MessageRetransmissionAdjuster.adjustRetransmittion call
in the MakeConnectionProcessor. This get called in the
SenderWorker.
Stopped the FaultManager from throwing out Faults directly. This causes
problems in scenarios such as Ack piggybacked Application msgs.
A fix to transfer RequestResponsTransport object from the request message to
the PollingResponse and LastMessageResponse messages.
SenderWorker was fixed to pick this correctly.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
Tue May 29 21:53:44 2007
@@ -269,9 +269,11 @@
int LAST_MESSAGE = 12;
- int DUPLICATE_MESSAGE = 13;
+ int DUPLICATE_MESSAGE = 13;
+
+ int POLL_RESPONSE_MESSAGE = 14;
- int MAX_MESSAGE_TYPE = 13;
+ int MAX_MESSAGE_TYPE = 14;
}
public interface MessageParts {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Tue May 29 21:53:44 2007
@@ -530,7 +530,13 @@
appMsgEntry.setMessageID(rmMsg.getMessageId());
appMsgEntry.setMessageNumber(messageNumber);
appMsgEntry.setLastMessage(lastMessage);
-
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+
+ SOAPEnvelope envelope = rmMsg.getSOAPEnvelope();
+ if (lastMessage && envelope!=null &&
envelope.getBody().getFirstOMChild()==null)
+
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.LAST_MESSAGE);
+ else
+
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+
appMsgEntry.setInboundSequenceId(inboundSequence);
appMsgEntry.setInboundMessageNumber(inboundMessageNumber);
if (outSequenceID == null) {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
Tue May 29 21:53:44 2007
@@ -9,10 +9,8 @@
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.description.AxisOperationFactory;
-import org.apache.axis2.description.OutInAxisOperation;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.MessageContextBuilder;
+import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.storage.StorageManager;
@@ -21,11 +19,9 @@
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Sequence;
-import org.ietf.jgss.MessageProp;
public class LastMessageProcessor implements MsgProcessor {
@@ -67,9 +63,11 @@
//there is a RMS sequence without a LastMsg entry
MessageContext msgContext =
rmMsgCtx.getMessageContext();
-// MessageContext outMessage =
MessageContextBuilder.createOutMessageContext(msgContext);
- MessageContext outMessageContext = new MessageContext
();
+ AxisOperation operation =
SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.LAST_MESSAGE,
+ rmMsgCtx.getRMSpecVersion() ,
msgContext.getAxisService());
+ MessageContext outMessageContext =
SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, operation);
+
outMessageContext.setServerSide(true);
outMessageContext.setTransportOut(msgContext.getTransportOut());
@@ -85,16 +83,7 @@
if (outMessageContext.getOptions()==null)
outMessageContext.setOptions(new Options ());
-
outMessageContext.setConfigurationContext(msgContext.getConfigurationContext());
-
outMessageContext.setServiceContext(msgContext.getServiceContext());
-
outMessageContext.setAxisService(msgContext.getAxisService());
-
- AxisOperation operation =
SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.LAST_MESSAGE,
-
rmMsgCtx.getRMSpecVersion() , msgContext.getAxisService());
-
- OperationContext operationContext = new
OperationContext (operation,msgContext.getServiceContext());
- operationContext.addMessageContext(outMessageContext);
-
+ OperationContext operationContext =
outMessageContext.getOperationContext();
String inboundSequenceId = (String)
msgContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
operationContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID,
inboundSequenceId);
@@ -103,13 +92,12 @@
operationContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER,
inboundMSgNo);
- outMessageContext.setAxisOperation(operation);
- outMessageContext.setOperationContext(operationContext);
-
outMessageContext.getOptions().setAction(Sandesha2Constants.SPEC_2005_02.Actions.ACTION_LAST_MESSAGE);
//says that the inbound msg of this was a LastMessage -
so the new msg will also be a LastMessage
outMessageContext.setProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE,
Boolean.TRUE);
+
outMessageContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
+ msgContext.getOperationContext().setProperty
(Constants.RESPONSE_WRITTEN,Constants.VALUE_TRUE);
AxisEngine engine = new AxisEngine
(rmMsgCtx.getConfigurationContext());
engine.send(outMessageContext);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
Tue May 29 21:53:44 2007
@@ -11,7 +11,9 @@
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.OutOnlyAxisOperation;
import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -168,7 +170,10 @@
setTransportProperties (returnMessage, pollMessage);
// Link the response to the request
- OperationContext context =
pollMessage.getMessageContext().getOperationContext();
+
+ AxisOperation operation =
SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE,
pollMessage.getRMSpecVersion(),
pollMessage.getMessageContext().getAxisService());
+ OperationContext context = new OperationContext (operation,
pollMessage.getMessageContext().getServiceContext());
+
if(context == null) {
AxisOperation oldOperation =
returnMessage.getAxisOperation();
@@ -181,12 +186,11 @@
returnMessage.setOperationContext(context);
returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE,
Boolean.TRUE);
+
returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
- // Update the senderBeans send time.
- boolean continueSend =
-
MessageRetransmissionAdjuster.adjustRetransmittion(returnRMMsg,
matchingMessage, returnRMMsg.getConfigurationContext(), storageManager);
+ //marking pollMessage as responsed
+
pollMessage.getMessageContext().getOperationContext().setProperty
(Constants.RESPONSE_WRITTEN,Constants.VALUE_TRUE);
- //
// Commit the current transaction, so that the SenderWorker can
do it's own locking
if(transaction != null && transaction.isActive())
transaction.commit();
@@ -194,11 +198,10 @@
//This will allow Sandesha2 to consider both of following
senarios equally.
// 1. A message being sent by the Sender thread.
// 2. A message being sent as a reply to an MakeConnection.
- if (continueSend) {
- SenderWorker worker = new SenderWorker
(pollMessage.getConfigurationContext(), matchingMessage,
returnRMMsg.getRMSpecVersion());
- worker.setMessage(returnRMMsg);
- worker.run();
- }
+ SenderWorker worker = new SenderWorker
(pollMessage.getConfigurationContext(), matchingMessage,
pollMessage.getRMSpecVersion());
+ worker.setMessage(returnRMMsg);
+ worker.run();
+
if(log.isDebugEnabled()) log.debug("Exit:
MakeConnectionProcessor::replyToPoll");
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Tue May 29 21:53:44 2007
@@ -210,7 +210,12 @@
SenderBeanMgr senderBeanMgr =
storageManager.getSenderBeanMgr();
SenderBean findSenderBean = new SenderBean ();
-
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+
+ if
(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.LAST_MESSAGE)
+
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.LAST_MESSAGE);
+ else
+
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+
findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
findSenderBean.setSend(true);
@@ -297,7 +302,10 @@
SenderBean sender =
storageManager.getSenderBeanMgr().findUnique(matcher);
if(sender != null) {
if(log.isDebugEnabled())
log.debug("Deleting sender for sync-2-way message");
+
storageManager.removeMessageContext(sender.getMessageContextRefKey());
+
+ //this causes the request to be deleted
even without an ack.
storageManager.getSenderBeanMgr().delete(messageId);
// Try and terminate the corresponding
outbound sequence
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
Tue May 29 21:53:44 2007
@@ -459,44 +459,30 @@
String SOAPNamespaceValue = factory.getSoapVersionURI();
- if
(SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
- reason.addSOAPText(reasonText);
-
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME,
faultCode);
-
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME,
reason);
-
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME,
detail);
- } else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals
(SOAPNamespaceValue)) {
- reason.setText(data.getReason());
-
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME,
faultCode);
-
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME,
detail);
-
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME,
reason);
- // Need to send this message as the Axis Layer doesn't
set the "SequenceFault" header
- MessageContext faultMessageContext =
-
MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(),
null);
+ reason.setText(data.getReason());
+
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME,
faultCode);
+
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME,
detail);
+
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME,
reason);
+ // Need to send this message as the Axis Layer doesn't set the
"SequenceFault" header
+ MessageContext faultMessageContext =
+
MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(),
null);
-
SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext,
Sandesha2Constants.SOAPVersion.v1_1, data,
referenceRMMsgContext.getRMNamespaceValue());
+
SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext,
Sandesha2Constants.SOAPVersion.v1_1, data,
referenceRMMsgContext.getRMNamespaceValue());
-
referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
+
referenceRMMsgContext.getMessageContext().getOperationContext().setProperty(
org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
- // Set the action
- faultMessageContext.setWSAAction(
+ // Set the action
+ faultMessageContext.setWSAAction(
SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
- if (log.isDebugEnabled())
- log.debug("Sending fault message " +
faultMessageContext.getEnvelope().getHeader());
- // Send the message
- AxisEngine engine = new
AxisEngine(faultMessageContext.getConfigurationContext());
- engine.sendFault(faultMessageContext);
+ if (log.isDebugEnabled())
+ log.debug("Sending fault message " +
faultMessageContext.getEnvelope().getHeader());
+ // Send the message
+ AxisEngine engine = new
AxisEngine(faultMessageContext.getConfigurationContext());
+ engine.sendFault(faultMessageContext);
- return;
- } else {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion);
- throw new SandeshaException (message);
- }
- AxisFault fault = new
AxisFault(faultColdValue.getTextAsQName(), data.getReason(), "", "",
data.getDetail());
-
fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
- throw fault;
-
+ return;
}
public static boolean isRMFault (String faultSubcodeValue) {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
Tue May 29 21:53:44 2007
@@ -420,6 +420,9 @@
newMessageContext.setProperty(MessageContext.TRANSPORT_OUT, referenceMessage
.getProperty(MessageContext.TRANSPORT_OUT));
+
newMessageContext.setProperty(AddressingConstants.WS_ADDRESSING_VERSION,
+
referenceMessage.getProperty(AddressingConstants.WS_ADDRESSING_VERSION));
+
copyConfiguredProperties
(referenceMessage,newMessageContext);
//copying the serverSide property
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SpecSpecificConstants.java
Tue May 29 21:53:44 2007
@@ -349,10 +349,13 @@
case Sandesha2Constants.MessageTypes.LAST_MESSAGE:
result =
service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
break;
- case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
- result =
service.getOperation(Sandesha2Constants.RM_DUPLICATE_OPERATION);
- break;
- }
+ case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
+ result =
service.getOperation(Sandesha2Constants.RM_DUPLICATE_OPERATION);
+ break;
+ case
Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE:
+ result =
service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
+ break;
+ }
} else
if(rmSpecLevel.equals(Sandesha2Constants.SPEC_VERSIONS.v1_1)) {
switch(messageType) {
case Sandesha2Constants.MessageTypes.CREATE_SEQ:
@@ -365,6 +368,9 @@
case Sandesha2Constants.MessageTypes.ACK_REQUEST:
result =
service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
break;
+ case
Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE:
+ result =
service.getOperation(Sandesha2Constants.RM_OUT_ONLY_OPERATION);
+ break;
}
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=542750&r1=542749&r2=542750
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Tue May 29 21:53:44 2007
@@ -35,12 +35,15 @@
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
@@ -60,7 +63,12 @@
private SenderBean senderBean = null;
private RMMsgContext messageToSend = null;
private String rmVersion = null;
+ private RMDBean incomingSequenceBean = null;
+ public void setIncomingSequenceBean(RMDBean incomingSequenceBean) {
+ this.incomingSequenceBean = incomingSequenceBean;
+ }
+
public SenderWorker (ConfigurationContext configurationContext,
SenderBean senderBean, String rmVersion) {
this.configurationContext = configurationContext;
this.senderBean = senderBean;
@@ -134,16 +142,20 @@
// or the message can't go anywhere. If there is
nothing here then we leave the
// message in the sender queue, and a MakeConnection
(or a retransmitted request)
// will hopefully pick it up soon.
- RequestResponseTransport t = null;
Boolean makeConnection = (Boolean)
msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
EndpointReference toEPR = msgCtx.getTo();
MessageContext inMsg = null;
OperationContext op = msgCtx.getOperationContext();
- if (op != null)
- inMsg =
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- if (inMsg != null)
- t = (RequestResponseTransport)
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ RequestResponseTransport t = (RequestResponseTransport)
msgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ if (t==null) {
+ if (op != null)
+ inMsg =
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ if (inMsg != null)
+ t = (RequestResponseTransport)
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ }
// If we are anonymous, and this is not a
makeConnection, then we must have a transport waiting
if((toEPR==null || toEPR.hasAnonymousAddress()) &&
@@ -386,7 +398,8 @@
// Lock the message to enable retransmission update
senderBean =
storageManager.getSenderBeanMgr().retrieve(senderBean.getMessageID());
-
+ int messageType = senderBean.getMessageType();
+
// Only continue if we find a SenderBean
if (senderBean == null)
return false;
@@ -397,7 +410,9 @@
Identifier id = null;
- if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+ if(messageType == Sandesha2Constants.MessageTypes.APPLICATION ||
+ messageType == Sandesha2Constants.MessageTypes.LAST_MESSAGE)
{
+
String namespace =
SpecSpecificConstants.getRMNamespaceValue(rmVersion);
Sequence sequence = (Sequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if(sequence == null) {
@@ -417,17 +432,18 @@
sequence.setIdentifier(id);
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
+
}
- } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ } else if(messageType ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
TerminateSequence terminate = (TerminateSequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
id = terminate.getIdentifier();
- } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
+ } else if(messageType ==
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
CloseSequence close = (CloseSequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
id = close.getIdentifier();
- } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+ } else if(messageType ==
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
// The only time that we can have a message of this
type is when we are sending a
// stand-alone ack request, and in that case we only
expect to find a single ack
// request header in the message.
@@ -447,6 +463,36 @@
// Write the changes back into the message context
rmMsgContext.addSOAPEnvelope();
+ }
+
+ //if this is an sync WSRM 1.0 case we always have to add an ack
+ boolean ackPresent = false;
+ Iterator it = rmMsgContext.getMessageParts
(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ if (it.hasNext())
+ ackPresent = true;
+
+ if (!ackPresent &&
rmMsgContext.getMessageContext().isServerSide()
+ &&
+
(messageType==Sandesha2Constants.MessageTypes.APPLICATION ||
+ messageType==Sandesha2Constants.MessageTypes.APPLICATION ||
+ messageType==Sandesha2Constants.MessageTypes.UNKNOWN ||
+
messageType==Sandesha2Constants.MessageTypes.LAST_MESSAGE)) {
+
+ String inboundSequenceId =
senderBean.getInboundSequenceId();
+ if (inboundSequenceId==null)
+ throw new SandeshaException ("InboundSequenceID
is not set for the sequence:" + id);
+
+ RMDBean findBean = new RMDBean ();
+ findBean.setSequenceID(inboundSequenceId);
+
+ if (incomingSequenceBean==null) {
+ RMDBeanMgr rmdMgr =
storageManager.getRMDBeanMgr();
+ incomingSequenceBean =
rmdMgr.findUnique(findBean);
+ }
+
+ if (incomingSequenceBean!=null)
+ RMMsgCreator.addAckMessage(rmMsgContext,
inboundSequenceId, incomingSequenceBean);
+
}
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]