Author: chamikara Date: Thu Sep 14 02:32:31 2006 New Revision: 443299 URL: http://svn.apache.org/viewvc?view=rev&rev=443299 Log: Changed the methods of the MessageProcessor interface to throw AxisFaults.
SandeshaoutHandler was changed not to pause messages with wsa:anonymous endpoint as the wsa:to address. This will allow at least a sync response to be done (but no retransmissions will be possible). A bug fix - CreateSequence should have a different replyTo from the application message. Some bug fixes in close sequence and terminate sequence processing logic. Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Thu Sep 14 02:32:31 2006 @@ -223,7 +223,7 @@ log.debug("Exit: SandeshaGlobalInHandler::msgContext"); } - private boolean dropIfDuplicate(RMMsgContext rmMsgContext, StorageManager storageManager) throws SandeshaException { + private boolean dropIfDuplicate(RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: SandeshaGlobalInHandler::dropIfDuplicate"); @@ -335,7 +335,7 @@ } private void processDroppedMessage(RMMsgContext rmMsgContext, StorageManager storageManager) - throws SandeshaException { + throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: SandeshaGlobalInHandler::processDroppedMessage"); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Sep 14 02:32:31 2006 @@ -26,6 +26,7 @@ import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; +import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; @@ -88,7 +89,7 @@ private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class); - public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException { + public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: ApplicationMsgProcessor::processInMessage"); @@ -369,7 +370,7 @@ } public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr, StorageManager storageManager) - throws SandeshaException { + throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: ApplicationMsgProcessor::sendAckIfNeeded"); @@ -406,7 +407,7 @@ log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded"); } - public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException { + public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: ApplicationMsgProcessor::processOutMessage"); @@ -793,14 +794,23 @@ if (!dummyMessage) processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber, storageKey, storageManager); - msgContext.pause(); // the execution will be stopped. - + + if (!isWSAAnonymous (to)) { + //If message has a real to address or if it is in the polling-mode it shoud be send by the sender or should + //be taken away by make connections, so pausing it. + + msgContext.pause(); // the execution will be stopped. + } + + //If to address is wsa:anonymous it wont be possible to send the this message so, letting it go in the current thread. + //(it might get the the other end in the back-channel of the request message, no retransmissions possible). + if (log.isDebugEnabled()) log.debug("Exit: ApplicationMsgProcessor::processOutMessage"); } private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, String sequencePropertyKey, String internalSequenceId, String acksTo, - StorageManager storageManager) throws SandeshaException { + StorageManager storageManager) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + internalSequenceId); @@ -1164,5 +1174,13 @@ if (log.isDebugEnabled()) log.debug("Exit: ApplicationMsgProcessor::setNextMsgNo"); + } + + private boolean isWSAAnonymous (String address) { + if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) || + AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) + return true; + + return false; } } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Thu Sep 14 02:32:31 2006 @@ -17,6 +17,8 @@ package org.apache.sandesha2.msgprocessors; +import java.util.Iterator; + import org.apache.axiom.om.OMElement; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; @@ -54,7 +56,7 @@ private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class); - public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException { + public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: CloseSequenceProcessor::processInMessage"); @@ -123,8 +125,8 @@ } // adding the ack part to the envelope. - SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgCtx - .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); + Iterator sequenceAckIter = ackRMMsgCtx + .getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext(); @@ -139,9 +141,12 @@ RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg, storageManager); - closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, - sequenceAcknowledgement); - + while (sequenceAckIter.hasNext()) { + SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next(); + closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT, + sequenceAcknowledgement); + } + closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW); closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true"); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Sep 14 02:32:31 2006 @@ -64,7 +64,7 @@ private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class); - public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws SandeshaException { + public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage"); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Thu Sep 14 02:32:31 2006 @@ -17,6 +17,7 @@ package org.apache.sandesha2.msgprocessors; +import org.apache.axis2.AxisFault; import org.apache.sandesha2.RMMsgContext; import org.apache.sandesha2.SandeshaException; @@ -25,6 +26,6 @@ */ public interface MsgProcessor { - public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException; - public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException; + public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault; + public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault; } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Sep 14 02:32:31 2006 @@ -72,7 +72,7 @@ private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class); - public void processInMessage(RMMsgContext terminateSeqRMMsg) throws SandeshaException { + public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault { if (log.isDebugEnabled()) log.debug("Enter: TerminateSeqMsgProcessor::processInMessage"); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Thu Sep 14 02:32:31 2006 @@ -18,6 +18,7 @@ package org.apache.sandesha2.msgprocessors; import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.commons.logging.Log; @@ -41,7 +42,7 @@ private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class); public void processInMessage(RMMsgContext terminateResRMMsg) - throws SandeshaException { + throws AxisFault { if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage"); MessageContext msgContext = terminateResRMMsg.getMessageContext(); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java Thu Sep 14 02:32:31 2006 @@ -415,6 +415,7 @@ // end hack + //TODO this fails when the in message is in only. Fault is thrown at the InOnlyAxisOperation MessageContext faultMsgContext = Utils.createOutMessageContext(referenceMessage); // setting contexts. Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Thu Sep 14 02:32:31 2006 @@ -36,6 +36,7 @@ import org.apache.axis2.description.AxisOperation; import org.apache.axis2.description.AxisOperationFactory; import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -200,7 +201,7 @@ * @throws SandeshaException */ public static RMMsgContext createCreateSeqMsg(RMMsgContext applicationRMMsg, String sequencePropertyKey, - String acksTo, StorageManager storageManager) throws SandeshaException { + String acksTo, StorageManager storageManager) throws AxisFault { MessageContext applicationMsgContext = applicationRMMsg.getMessageContext(); if (applicationMsgContext == null) @@ -222,7 +223,7 @@ createSeqmsgContext = SandeshaUtil .createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation); - + initializeCreation(applicationMsgContext, createSeqmsgContext); OperationContext createSeqOpCtx = createSeqmsgContext.getOperationContext(); @@ -249,8 +250,24 @@ createSeqmsgContext.setAxisOperation(createSeqOperation); createSeqmsgContext.setTo(applicationRMMsg.getTo()); - createSeqmsgContext.setReplyTo(applicationRMMsg.getReplyTo()); - + +// createSeqmsgContext.setReplyTo(applicationRMMsg.getReplyTo()); + //generating a new replyTo address for the CreateSequenceMessage. + //Otherwise there will be errors when the app msg is InOnly. + + QName axisOperationName = createSeqmsgContext.getAxisOperation().getName(); + TransportInDescription transportIn = createSeqmsgContext.getTransportIn(); + QName transportInName = null; + if (transportIn!=null) + transportInName = transportIn.getName(); + + EndpointReference replyTo = context.getListenerManager().getEPRforService( + createSeqmsgContext.getAxisService().getName(), + axisOperationName!=null?axisOperationName.getLocalPart():null, + transportInName!=null?transportInName.getLocalPart():null); + + createSeqmsgContext.setReplyTo(replyTo); + RMMsgContext createSeqRMMsg = new RMMsgContext(createSeqmsgContext); String rmVersion = SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager); Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Thu Sep 14 02:32:31 2006 @@ -962,7 +962,7 @@ * @return */ - public static String getSequencePropertyKey (RMMsgContext rmMsgContext) { + public static String getSequencePropertyKey (RMMsgContext rmMsgContext) throws AxisFault { String sequenceId = (String) rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID); String internalSequenceId = (String) rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID); @@ -979,9 +979,11 @@ propertyKey = internalSequenceId; else propertyKey = sequenceId; + } else if (flow==MessageContext.OUT_FAULT_FLOW) { + propertyKey = internalSequenceId; } - //TODO handler faults + //TODO handler cases not covered from above. return propertyKey; } Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=443299&r1=443298&r2=443299 ============================================================================== --- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original) +++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Thu Sep 14 02:32:31 2006 @@ -104,10 +104,14 @@ // RMMsgCreator.addAckMessage(rmMsgCtx); //} else - if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks + + if (isAckPiggybackableMsgType(messageType)) { // checking weather this message can carry piggybacked acks // piggybacking if an ack if available for the same // sequence. // TODO do piggybacking based on wsa:To + + + AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]