Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=497693&r1=497692&r2=497693 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Thu Jan 18 18:53:14 2007 @@ -7,14 +7,17 @@ import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFault; import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.OperationContext; +import org.apache.axis2.context.OperationContextFactory; import org.apache.axis2.engine.AxisEngine; import org.apache.axis2.engine.Handler.InvocationResponse; import org.apache.axis2.transport.RequestResponseTransport; import org.apache.axis2.transport.TransportUtils; +import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus; import org.apache.axis2.transport.http.HTTPConstants; import org.apache.axis2.wsdl.WSDLConstants; import org.apache.commons.logging.Log; @@ -33,6 +36,7 @@ import org.apache.sandesha2.util.MessageRetransmissionAdjuster; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.SandeshaUtil; +import org.apache.sandesha2.util.SpecSpecificConstants; import org.apache.sandesha2.util.TerminateManager; import org.apache.sandesha2.wsrm.TerminateSequence; @@ -83,14 +87,6 @@ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx); } - boolean continueSending = MessageRetransmissionAdjuster.adjustRetransmittion(rmMsgCtx, senderBean, configurationContext, - storageManager); - if (!continueSending) { - if (log.isDebugEnabled()) - log.debug("Exit: SenderWorker::run, !continueSending"); - return; - } - // sender will not send the message if following property is // set and not true. // But it will set if it is not set (null) @@ -120,22 +116,52 @@ return; } - // If we are sending to the anonymous URI then we _must_ have a transport waiting, + String specVersion = rmMsgCtx.getRMSpecVersion(); + + // If we are sending to the anonymous URI and this is WSRM 1.0 then we _must_ have a transport waiting, // or the message can't go anywhere. If there is nothing here then we leave the // message in the sender queue, and a MakeConnection will hopefully pick it up // soon. + // If this is RM 1.1 we have the MakeConnection mechanism so this check is not necessary. EndpointReference toEPR = msgCtx.getTo(); - if(toEPR.hasAnonymousAddress()) { - RequestResponseTransport t = null; - MessageContext inMsg = null; - OperationContext op = msgCtx.getOperationContext(); - if(op != null) inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE); - if(inMsg != null) t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL); - if(t == null) { - if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::run, no response transport for anonymous message"); - return; + if(toEPR.hasAnonymousAddress() ) { + + if (specVersion != null && Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) { + //for RM 1.0 we dont let this go forward until a valid RequestResponseTransport is available to attach a + //async response. + RequestResponseTransport t = null; + MessageContext inMsg = null; + OperationContext op = msgCtx.getOperationContext(); + if (op != null) + inMsg = op.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE); + if (inMsg != null) + t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL); + + if (t == null || !t.getStatus().equals(RequestResponseTransportStatus.WAITING)) { + if (log.isDebugEnabled()) + log.debug("Exit: SenderWorker::run, no response transport for anonymous message"); + return; + } + } else { + //for RM 1.1 we dont let this go forward only if this is an response to a MakeConnection message + + Boolean makeConnectionResponse = (Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE); + + if (makeConnectionResponse==null || !Boolean.TRUE.equals(makeConnectionResponse)) { + if (log.isDebugEnabled()) + log.debug("Exit: SenderWorker::run, no response transport for anonymous message"); + return; + } } } + + boolean continueSending = updateMessage(rmMsgCtx,senderBean,storageManager); + + if (!continueSending) { + if (log.isDebugEnabled()) + log.debug("Exit: SenderWorker::run, !continueSending"); + return; + } int messageType = senderBean.getMessageType(); @@ -150,6 +176,7 @@ //} else if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks + // checking weather this message can carry piggybacked acks // piggybacking if an ack if available for the same // sequence. // TODO do piggybacking based on wsa:To @@ -276,7 +303,10 @@ if (!msgCtx.isServerSide()) { // Commit the transaction to release the SenderBean - transaction.commit(); + + if (transaction!=null) + transaction.commit(); + transaction = null; checkForSyncResponses(msgCtx); } @@ -319,6 +349,14 @@ log.debug("Exit: SenderWorker::run"); } + private boolean updateMessage(RMMsgContext rmMsgContext, SenderBean senderBean, StorageManager storageManager) throws AxisFault { + + boolean continueSending = MessageRetransmissionAdjuster.adjustRetransmittion( + rmMsgContext, senderBean, rmMsgContext.getConfigurationContext(), storageManager); + + return continueSending; + } + private boolean isAckPiggybackableMsgType(int messageType) { if (log.isDebugEnabled()) log.debug("Enter: SenderWorker::isAckPiggybackableMsgType, " + messageType); @@ -362,30 +400,17 @@ // copying required properties from op. context to the response msg // ctx. + + // copying required properties from request op. context to the response msg + // ctx. + OperationContext requestMsgOpCtx = msgCtx.getOperationContext(); - if (requestMsgOpCtx != null) { - - // If the AxisOperation object doesn't have a message receiver, it means that this was - // an out only op where we have added an ACK to the response. Set the requestMsgOpCtx to - // be the RMIn - if (requestMsgOpCtx.getAxisOperation().getMessageReceiver() == null) { - // Generate a new RM In Only operation - requestMsgOpCtx = new OperationContext( msgCtx.getAxisService().getOperation(new QName("RMInOnlyOperation"))); - } - - responseMessageContext.setOperationContext(requestMsgOpCtx); - - if (responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE) == null) { - responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE, requestMsgOpCtx + responseMessageContext.setProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE, requestMsgOpCtx .getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE)); - } - - if (responseMessageContext.getProperty(HTTPConstants.CHAR_SET_ENCODING) == null) { - responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING, requestMsgOpCtx + responseMessageContext.setProperty(HTTPConstants.CHAR_SET_ENCODING, requestMsgOpCtx .getProperty(HTTPConstants.CHAR_SET_ENCODING)); - } - } + // If request is REST we assume the responseMessageContext is REST, // so set the variable @@ -398,10 +423,10 @@ resenvelope = TransportUtils.createSOAPMessage(responseMessageContext, msgCtx.getEnvelope().getNamespace().getNamespaceURI()); } catch (AxisFault e) { //Cannot find a valid SOAP envelope. - if (log.isDebugEnabled()) { - log.debug(SandeshaMessageHelper + if (log.isErrorEnabled() ) { + log.error (SandeshaMessageHelper .getMessage(SandeshaMessageKeys.soapEnvNotSet)); - log.debug("Caught exception", e); + log.error ("Caught exception", e); } return; @@ -411,6 +436,41 @@ if (log.isDebugEnabled()) log.debug("Response " + resenvelope.getHeader()); responseMessageContext.setEnvelope(resenvelope); + + +// //If this message is an RM Control message it should not be assumed as an application message. +// //So dispatching etc should happen just like for a new message comming into the system. +// RMMsgContext responseRMMsg = MsgInitializer.initializeMessage(responseMessageContext); +// if (responseRMMsg.getMessageType()!=Sandesha2Constants.MessageTypes.UNKNOWN && +// responseRMMsg.getMessageType()!=Sandesha2Constants.MessageTypes.APPLICATION ) { +// responseMessageContext.setAxisOperation(null); +// responseMessageContext.setOperationContext(null); +// } + + //If addressing is disabled we will be adding this message simply as the application response of the request message. + Boolean addressingDisabled = (Boolean) msgCtx.getOptions().getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES); + if (addressingDisabled!=null && Boolean.TRUE.equals(addressingDisabled)) { + // If the AxisOperation object doesn't have a message receiver, it means that this was + // an out only op where we have added an ACK to the response. Set the requestMsgOpCtx to + // be the RMIn + OperationContext responseMsgOpCtx = requestMsgOpCtx; + if (requestMsgOpCtx.getAxisOperation().getMessageReceiver() == null) { + // Generate a new RM In Only operation + responseMsgOpCtx = new OperationContext( msgCtx.getAxisService().getOperation(new QName("RMInOnlyOperation"))); + } + + responseMessageContext.setOperationContext(responseMsgOpCtx); + } + + RMMsgContext responseRMMessage = MsgInitializer.initializeMessage(responseMessageContext); + if (responseRMMessage.getMessageType()==Sandesha2Constants.MessageTypes.ACK) { + responseMessageContext.setAxisOperation(SpecSpecificConstants.getWSRMOperation + (Sandesha2Constants.MessageTypes.ACK, responseRMMessage.getRMSpecVersion(), responseMessageContext.getAxisService())); + responseMessageContext.setOperationContext(null); + } + + + AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext()); if (isFaultEnvelope(resenvelope)) {
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java?view=diff&rev=497693&r1=497692&r2=497693 ============================================================================== --- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java (original) +++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java Thu Jan 18 18:53:14 2007 @@ -51,12 +51,22 @@ startServer(repoPath, axis2_xml); } - public void testSyncEchoWithOffer() throws Exception { + public void testSyncEchoWithOffer_1_1 () throws Exception { Options clientOptions = new Options (); String offeredSequenceID = SandeshaUtil.getUUID(); clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID); + clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1); + runSyncEchoTest(clientOptions); + } + + public void testSyncEchoWithOffer_1_0 () throws Exception { + + Options clientOptions = new Options (); + String offeredSequenceID = SandeshaUtil.getUUID(); + clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID); + clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_0); runSyncEchoTest(clientOptions); } @@ -64,6 +74,7 @@ public void testSyncEchoWithRMAnon() throws Exception { Options clientOptions = new Options (); + clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1); runSyncEchoTest(clientOptions); } @@ -80,7 +91,7 @@ String sequenceKey = SandeshaUtil.getUUID(); clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, "true"); clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey); - clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1); + clientOptions.setTransportInProtocol(Constants.TRANSPORT_HTTP); ServiceClient serviceClient = new ServiceClient (configContext,null); Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/utils/RangeStringTest.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/utils/RangeStringTest.java?view=diff&rev=497693&r1=497692&r2=497693 ============================================================================== --- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/utils/RangeStringTest.java (original) +++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/utils/RangeStringTest.java Thu Jan 18 18:53:14 2007 @@ -13,15 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sandesha2.util; +package org.apache.sandesha2.utils; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import org.apache.sandesha2.SandeshaTestCase; - -import com.ibm.jvm.util.ByteArrayOutputStream; +import org.apache.sandesha2.util.Range; +import org.apache.sandesha2.util.RangeString; public class RangeStringTest extends SandeshaTestCase{ --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
