Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Fri May 12 12:16:32 2006 @@ -20,6 +20,7 @@ import javax.xml.namespace.QName; import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.AddressingConstants; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.OperationContextFactory; @@ -34,6 +35,7 @@ import org.apache.sandesha2.msgprocessors.MsgProcessor; import org.apache.sandesha2.msgprocessors.MsgProcessorFactory; import org.apache.sandesha2.storage.StorageManager; +import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.util.MsgInitializer; import org.apache.sandesha2.util.SandeshaUtil; import org.apache.sandesha2.wsrm.Sequence; @@ -63,42 +65,71 @@ log.debug(message); throw new AxisFault(message); } - - // getting rm message - RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx); - String DONE = (String) msgCtx - .getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE); + String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE); if (null != DONE && "true".equals(DONE)) return; - msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true"); - - String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE); - boolean dummyMessage = false; - if (dummyMessageString!=null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString)) - dummyMessage = true; - + msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true"); + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); - MsgProcessor msgProcessor = null; - int messageType = rmMsgCtx.getMessageType(); - if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) { - MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE); - if (requestMsgCtx!=null) { //for the server side - RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx); - Sequence sequencePart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); - if (sequencePart!=null) - msgProcessor = new ApplicationMsgProcessor ();// a rm intended message. - } else if (!msgCtx.isServerSide()) //if client side. - msgProcessor = new ApplicationMsgProcessor (); - }else { - msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx); + boolean withinTransaction = false; + String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION); + if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) { + withinTransaction = true; } + + Transaction transaction = null; + if (!withinTransaction) { + transaction = storageManager.getTransaction(); + msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE); + } + boolean rolebacked = false; - if (msgProcessor!=null) - msgProcessor.processOutMessage(rmMsgCtx); - + try { + // getting rm message + RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx); + + String dummyMessageString = (String) msgCtx.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE); + boolean dummyMessage = false; + if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString)) + dummyMessage = true; + + MsgProcessor msgProcessor = null; + int messageType = rmMsgCtx.getMessageType(); + if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) { + MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext( + OperationContextFactory.MESSAGE_LABEL_IN_VALUE); + if (requestMsgCtx != null) { // for the server side + RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx); + Sequence sequencePart = (Sequence) reqRMMsgCtx + .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); + if (sequencePart != null) + msgProcessor = new ApplicationMsgProcessor();// a rm + // intended + // message. + } else if (!msgCtx.isServerSide()) // if client side. + msgProcessor = new ApplicationMsgProcessor(); + } else { + msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx); + } + + if (msgProcessor != null) + msgProcessor.processOutMessage(rmMsgCtx); + + } catch (Exception e) { + if (!withinTransaction) { + transaction.rollback(); + msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE); + rolebacked = true; + } + } finally { + if (!withinTransaction && !rolebacked) { + transaction.commit(); + msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE); + } + } } public QName getName() {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Fri May 12 12:16:32 2006 @@ -39,7 +39,6 @@ import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.SenderBean; @@ -64,7 +63,6 @@ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException { - AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST); if (ackRequested==null) { throw new SandeshaException ("Message identified as of type ackRequested does not have an AckRequeted element"); @@ -79,9 +77,11 @@ String sequenceID = ackRequested.getIdentifier().getIdentifier(); ConfigurationContext configurationContext = rmMsgCtx.getMessageContext().getConfigurationContext(); + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); - SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); + SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); + //Setting the ack depending on AcksTo. SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR); @@ -139,8 +139,6 @@ ackMsgCtx.setProperty(AddressingConstants.WS_ADDRESSING_VERSION, msgContext.getProperty(AddressingConstants.WS_ADDRESSING_VERSION)); //TODO do this in the RMMsgCreator -// RMMsgContext ackRMMsgCtx = rmm - String addressingNamespaceURI = SandeshaUtil.getSequenceProperty(sequenceID,Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,configurationContext); String anonymousURI = SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI); @@ -170,15 +168,15 @@ rmMsgCtx.getMessageContext().setProperty( Sandesha2Constants.ACK_WRITTEN, "true"); + try { engine.send(ackRMMsgCtx.getMessageContext()); } catch (AxisFault e1) { throw new SandeshaException(e1.getMessage()); } + } else { - Transaction asyncAckTransaction = storageManager.getTransaction(); - SenderBeanMgr retransmitterBeanMgr = storageManager .getRetransmitterBeanMgr(); @@ -238,8 +236,6 @@ //inserting the new ack. retransmitterBeanMgr.insert(ackBean); - asyncAckTransaction.commit(); - //passing the message through sandesha2sender ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut()); @@ -255,6 +251,7 @@ } catch (AxisFault e) { throw new SandeshaException (e.getMessage()); } + SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Fri May 12 12:16:32 2006 @@ -31,7 +31,6 @@ import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.SenderBean; @@ -65,6 +64,8 @@ throw new SandeshaException(message); } + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext()); + MessageContext msgCtx = rmMsgCtx.getMessageContext(); ConfigurationContext configCtx = msgCtx.getConfigurationContext(); @@ -72,16 +73,12 @@ sequenceAck.setMustUnderstand(false); rmMsgCtx.addSOAPEnvelope(); - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(rmMsgCtx.getMessageContext() - .getConfigurationContext()); + SenderBeanMgr retransmitterMgr = storageManager .getRetransmitterBeanMgr(); SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); - - Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges() .iterator(); @@ -96,6 +93,7 @@ FaultManager faultManager = new FaultManager(); RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId); if (faultMessageContext != null) { + ConfigurationContext configurationContext = msgCtx.getConfigurationContext(); AxisEngine engine = new AxisEngine(configurationContext); @@ -105,11 +103,13 @@ throw new SandeshaException ("Could not send the fault message",e); } + msgCtx.pause(); return; } faultMessageContext = faultManager.checkForInvalidAcknowledgement(rmMsgCtx); if (faultMessageContext != null) { + ConfigurationContext configurationContext = msgCtx.getConfigurationContext(); AxisEngine engine = new AxisEngine(configurationContext); @@ -119,25 +119,22 @@ throw new SandeshaException ("Could not send the fault message",e); } + msgCtx.pause(); return; } String internalSequenceID = SandeshaUtil.getSequenceProperty(outSequenceId,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID,configCtx); //updating the last activated time of the sequence. - Transaction lastUpdatedTimeTransaction = storageManager.getTransaction(); SequenceManager.updateLastActivatedTime(internalSequenceID,rmMsgCtx.getMessageContext().getConfigurationContext()); - lastUpdatedTimeTransaction.commit(); - //Starting transaction - Transaction ackTransaction = storageManager.getTransaction(); - SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve( outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID); if (internalSequenceBean == null || internalSequenceBean.getValue() == null) { String message = "TempSequenceId is not set correctly"; log.debug(message); + throw new SandeshaException(message); } @@ -218,9 +215,6 @@ allCompletedMsgsBean.setValue(str); seqPropMgr.update(allCompletedMsgsBean); - - //commiting transaction - ackTransaction.commit(); String lastOutMsgNoStr = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx); if (lastOutMsgNoStr!=null ) { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Fri May 12 12:16:32 2006 @@ -42,7 +42,6 @@ import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.client.SandeshaListener; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr; import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr; import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr; @@ -127,8 +126,6 @@ .getSandeshaStorageManager(rmMsgCtx.getMessageContext() .getConfigurationContext()); - - FaultManager faultManager = new FaultManager(); RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx); if (faultMessageContext != null) { @@ -141,6 +138,7 @@ throw new SandeshaException ("Could not send the fault message",e); } + msgCtx.pause(); return; } @@ -169,6 +167,7 @@ throw new SandeshaException ("Could not send the fault message",e); } + msgCtx.pause(); return; } @@ -191,14 +190,8 @@ return; } - Transaction lastUpdatedTimeTransaction = storageManager.getTransaction(); - //updating the last activated time of the sequence. SequenceManager.updateLastActivatedTime(sequenceId,configCtx); - lastUpdatedTimeTransaction.commit(); - - Transaction updataMsgStringTransaction = storageManager - .getTransaction(); SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId, Sandesha2Constants.SequenceProperties.SERVER_COMPLETED_MESSAGES); @@ -263,16 +256,13 @@ msgsBean.setValue(messagesStr); seqPropMgr.update(msgsBean); - updataMsgStringTransaction.commit(); - - Transaction invokeTransaction = storageManager.getTransaction(); - // Pause the messages bean if not the right message to invoke. NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr(); NextMsgBean bean = mgr.retrieve(sequenceId); - if (bean == null) + if (bean == null) { throw new SandeshaException("Error- The sequence does not exist"); + } InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr(); @@ -281,8 +271,6 @@ if (inOrderInvocation) { - //pause the message - rmMsgCtx.pause(); SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr .retrieve( @@ -299,6 +287,7 @@ incomingSequenceListBean.setValue(incomingSequenceList .toString()); + //this get inserted before seqPropMgr.insert(incomingSequenceListBean); } @@ -312,7 +301,7 @@ //saving the property. incomingSequenceListBean.setValue(incomingSequenceList .toString()); - seqPropMgr.insert(incomingSequenceListBean); + seqPropMgr.update(incomingSequenceListBean); } //saving the message. @@ -330,6 +319,9 @@ } catch (Exception ex) { throw new SandeshaException(ex.getMessage()); } + + //pause the message + rmMsgCtx.pause(); //Starting the invoker if stopped. SandeshaUtil @@ -337,8 +329,6 @@ } - invokeTransaction.commit(); - //Sending acknowledgements sendAckIfNeeded(rmMsgCtx, messagesStr); @@ -367,7 +357,7 @@ .getSandeshaStorageManager(msgCtx.getConfigurationContext()); SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); - + Sequence sequence = (Sequence) rmMsgCtx .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); String sequenceId = sequence.getIdentifier().getIdentifier(); @@ -416,7 +406,6 @@ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext); SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); - Transaction outHandlerTransaction = storageManager.getTransaction(); boolean serverSide = msgContext.isServerSide(); // setting message Id if null @@ -590,16 +579,18 @@ throw new SandeshaException (e); } - if (requestMessageContext==null) + if (requestMessageContext==null) { throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID"); + } RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext); Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); String requestSequenceID = sequence.getIdentifier().getIdentifier(); SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION); - if (specVersionBean==null) + if (specVersionBean==null) { throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side"); + } specVersion = specVersionBean.getValue(); } else { @@ -744,8 +735,6 @@ processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber,storageKey); msgContext.pause(); // the execution will be stopped. - outHandlerTransaction.commit(); - } private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Fri May 12 12:16:32 2006 @@ -11,7 +11,6 @@ import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.SequencePropertyBean; import org.apache.sandesha2.util.AcknowledgementManager; @@ -45,13 +44,12 @@ throw new SandeshaException ("Could not send the fault message",e); } + msgCtx.pause(); return; } StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx); - Transaction closeSequenceTransaction = storageManager.getTransaction(); - SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean sequenceClosedBean = new SequencePropertyBean (); sequenceClosedBean.setSequenceID(sequenceID); @@ -107,9 +105,6 @@ String message = "Could not send the terminate sequence response"; throw new SandeshaException (message,e); } - - - closeSequenceTransaction.commit(); } public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri May 12 12:16:32 2006 @@ -34,7 +34,6 @@ import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.client.SandeshaListener; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.CreateSeqBean; @@ -82,6 +81,7 @@ throw new SandeshaException ("Could not send the fault message",e); } + createSeqMsg.pause(); return; } @@ -92,7 +92,6 @@ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr(); - Transaction createSequenceTransaction = storageManager.getTransaction(); //begining of a new transaction try { String newSequenceId = SequenceManager.setupNewSequence(createSeqRMMsg); //newly created sequnceID. @@ -170,11 +169,8 @@ outMessage.setResponseWritten(true); //commiting tr. before sending the response msg. - createSequenceTransaction.commit(); - Transaction updateLastActivatedTransaction = storageManager.getTransaction(); SequenceManager.updateLastActivatedTime(newSequenceId,createSeqRMMsg.getMessageContext().getConfigurationContext()); - updateLastActivatedTransaction.commit(); AxisEngine engine = new AxisEngine(context); engine.send(outMessage); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Fri May 12 12:16:32 2006 @@ -75,7 +75,7 @@ .getSandeshaStorageManager(configCtx); //Processing for ack if available - Transaction ackProcessTransaction = storageManager.getTransaction(); +/// Transaction ackProcessTransaction = storageManager.getTransaction(); SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) createSeqResponseRMMsgCtx .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT); @@ -84,11 +84,11 @@ ackProcessor.processInMessage(createSeqResponseRMMsgCtx); } - ackProcessTransaction.commit(); +/// ackProcessTransaction.commit(); //Processing the create sequence response. - Transaction createSeqResponseTransaction = storageManager.getTransaction(); +/// Transaction createSeqResponseTransaction = storageManager.getTransaction(); CreateSequenceResponse createSeqResponsePart = (CreateSequenceResponse) createSeqResponseRMMsgCtx .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ_RESPONSE); @@ -154,10 +154,10 @@ sequencePropMgr.insert(outSequenceBean); sequencePropMgr.insert(internalSequenceBean); - createSeqResponseTransaction.commit(); +/// createSeqResponseTransaction.commit(); - Transaction offerProcessTransaction = storageManager.getTransaction(); +/// Transaction offerProcessTransaction = storageManager.getTransaction(); //processing for accept (offer has been sent) Accept accept = createSeqResponsePart.getAccept(); @@ -217,9 +217,9 @@ } - offerProcessTransaction.commit(); +/// offerProcessTransaction.commit(); - Transaction updateAppMessagesTransaction = storageManager.getTransaction(); +/// Transaction updateAppMessagesTransaction = storageManager.getTransaction(); SenderBean target = new SenderBean(); target.setInternalSequenceID(internalSequenceId); @@ -282,11 +282,11 @@ storageManager.updateMessageContext(key,applicationMsg); } - updateAppMessagesTransaction.commit(); +/// updateAppMessagesTransaction.commit(); - Transaction lastUpdatedTimeTransaction = storageManager.getTransaction(); +/// Transaction lastUpdatedTimeTransaction = storageManager.getTransaction(); SequenceManager.updateLastActivatedTime(internalSequenceId,configCtx); - lastUpdatedTimeTransaction.commit(); +/// lastUpdatedTimeTransaction.commit(); createSeqResponseRMMsgCtx.getMessageContext().getOperationContext() .setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Fri May 12 12:16:32 2006 @@ -39,7 +39,6 @@ import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.client.SandeshaClientConstants; import org.apache.sandesha2.storage.StorageManager; -import org.apache.sandesha2.storage.Transaction; import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr; import org.apache.sandesha2.storage.beans.SenderBean; @@ -106,6 +105,8 @@ } catch (AxisFault e) { throw new SandeshaException ("Could not send the fault message",e); } + + terminateSeqMsg.pause(); return; } @@ -113,7 +114,6 @@ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr(); - Transaction terminateReceivedTransaction = storageManager.getTransaction(); SequencePropertyBean terminateReceivedBean = new SequencePropertyBean (); terminateReceivedBean.setSequenceID(sequenceId); terminateReceivedBean.setName(Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED); @@ -127,9 +127,6 @@ setUpHighestMsgNumbers(context,storageManager,sequenceId,terminateSeqRMMsg); - terminateReceivedTransaction.commit(); - - Transaction terminateTransaction = storageManager.getTransaction(); TerminateManager.cleanReceivingSideOnTerminateMessage(context,sequenceId); @@ -139,16 +136,12 @@ sequencePropertyBeanMgr.insert(terminatedBean); - terminateTransaction.commit(); - //removing an entry from the listener String transport = terminateSeqMsg.getTransportIn().getName().getLocalPart(); - Transaction lastUpdatedTransaction = storageManager.getTransaction(); SequenceManager.updateLastActivatedTime(sequenceId,context); - lastUpdatedTransaction.commit(); - terminateSeqRMMsg.pause(); + terminateSeqMsg.pause(); } private void setUpHighestMsgNumbers (ConfigurationContext configCtx, StorageManager storageManager, String sequenceID, RMMsgContext terminateRMMsg) throws SandeshaException { @@ -281,7 +274,7 @@ if (outSequenceID==null) throw new SandeshaException ("SequenceID was not found. Cannot send the terminate message"); - Transaction addTerminateSeqTransaction = storageManager.getTransaction(); +/// Transaction addTerminateSeqTransaction = storageManager.getTransaction(); String terminated = SandeshaUtil.getSequenceProperty(outSequenceID, Sandesha2Constants.SequenceProperties.TERMINATE_ADDED,configurationContext); @@ -384,7 +377,7 @@ rmMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key); rmMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE); rmMsgCtx.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ()); - addTerminateSeqTransaction.commit(); +/// addTerminateSeqTransaction.commit(); AxisEngine engine = new AxisEngine (configurationContext); try { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/AcknowledgementManager.java Fri May 12 12:16:32 2006 @@ -291,7 +291,7 @@ } else { - Transaction asyncAckTransaction = storageManager.getTransaction(); +/// Transaction asyncAckTransaction = storageManager.getTransaction(); SenderBeanMgr retransmitterBeanMgr = storageManager .getRetransmitterBeanMgr(); @@ -338,7 +338,7 @@ //inserting the new ack. retransmitterBeanMgr.insert(ackBean); - asyncAckTransaction.commit(); +/// asyncAckTransaction.commit(); //passing the message through sandesha2sender ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut()); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java Fri May 12 12:16:32 2006 @@ -411,6 +411,9 @@ } else { SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); + + //TODO get the acksTo value using the property key. + String sequenceId = data.getSequenceId(); SequencePropertyBean acksToBean = seqPropMgr.retrieve( sequenceId, Sandesha2Constants.SequenceProperties.ACKS_TO_EPR); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Fri May 12 12:16:32 2006 @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sandesha2.RMMsgContext; +import org.apache.sandesha2.Sandesha2Constants; import org.apache.sandesha2.SandeshaException; import org.apache.sandesha2.client.SandeshaClient; import org.apache.sandesha2.client.SandeshaClientConstants; @@ -132,6 +133,8 @@ private void finalizeTimedOutSequence (String internalSequenceID, String sequenceID ,MessageContext messageContext) throws SandeshaException { ConfigurationContext configurationContext = messageContext.getConfigurationContext(); + + configurationContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,messageContext.getProperty(Sandesha2Constants.WITHIN_TRANSACTION)); SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID ,configurationContext); TerminateManager.timeOutSendingSideSequence(configurationContext,internalSequenceID, false); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/PropertyManager.java Fri May 12 12:16:32 2006 @@ -166,7 +166,7 @@ try { processor.setup(); } catch (NoSuchMethodException e) { - throw new SandeshaException(e.getMessage()); + throw new SandeshaException(e); } processor.processPolicy(policy); Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Fri May 12 12:16:32 2006 @@ -419,7 +419,7 @@ public static long getOutGoingSequenceAckedMessageCount (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException { StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); - Transaction transaction = storageManager.getTransaction(); +/// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean findSeqIDBean = new SequencePropertyBean (); @@ -447,14 +447,14 @@ return 0; //No acknowledgement has been received yet. long noOfMessagesAcked = Long.parseLong(ackedMsgBean.getValue()); - transaction.commit(); +/// transaction.commit(); return noOfMessagesAcked; } public static boolean isOutGoingSequenceCompleted (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException { StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); - Transaction transaction = storageManager.getTransaction(); +/// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean findSeqIDBean = new SequencePropertyBean (); @@ -484,14 +484,14 @@ if ("true".equals(terminateAddedBean.getValue())) return true; - transaction.commit(); +/// transaction.commit(); return false; } public static boolean isIncomingSequenceCompleted (String sequenceID, ConfigurationContext configurationContext) throws SandeshaException { StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext); - Transaction transaction = storageManager.getTransaction(); +/// Transaction transaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr(); SequencePropertyBean terminateReceivedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.TERMINATE_RECEIVED); @@ -500,7 +500,7 @@ if (terminateReceivedBean!=null && "true".equals(terminateReceivedBean.getValue())) complete = true; - transaction.commit(); +/// transaction.commit(); return complete; } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/TerminateManager.java Fri May 12 12:16:32 2006 @@ -240,8 +240,8 @@ if (Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID.equals(name)) deleatable = false; - if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name)) - deleatable = false; +// if (Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION.equals(name)) +// deleatable = false; if (Sandesha2Constants.SequenceProperties.SEQUENCE_TERMINATED.equals(name)) deleatable = false; @@ -316,6 +316,7 @@ SequencePropertyBean sequencePropertyBean = (SequencePropertyBean) iterator.next(); doUpdatesIfNeeded (outSequenceID,sequencePropertyBean,sequencePropertyBeanMgr); + //TODO all properties which hv the temm:Seq:id as the key should be deletable. if (isProportyDeletable(sequencePropertyBean.getName())) { sequencePropertyBeanMgr.delete(sequencePropertyBean.getSequenceID(),sequencePropertyBean.getName()); } @@ -330,7 +331,7 @@ StorageManager storageManager = SandeshaUtil .getSandeshaStorageManager(configurationContext); - Transaction addTerminateSeqTransaction = storageManager.getTransaction(); +/// Transaction addTerminateSeqTransaction = storageManager.getTransaction(); SequencePropertyBeanMgr seqPropMgr = storageManager .getSequencePropretyBeanMgr(); @@ -424,7 +425,7 @@ terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key); terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE); terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ()); - addTerminateSeqTransaction.commit(); +/// addTerminateSeqTransaction.commit(); AxisEngine engine = new AxisEngine (configurationContext); try { Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Fri May 12 12:16:32 2006 @@ -100,43 +100,44 @@ log.debug(ex.getMessage()); } + Transaction transaction = null; + boolean rolebacked = false; + try { - StorageManager storageManager = SandeshaUtil - .getSandeshaStorageManager(context); + StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context); NextMsgBeanMgr nextMsgMgr = storageManager.getNextMsgBeanMgr(); - InvokerBeanMgr storageMapMgr = storageManager - .getStorageMapBeanMgr(); + InvokerBeanMgr storageMapMgr = storageManager.getStorageMapBeanMgr(); SequencePropertyBeanMgr sequencePropMgr = storageManager .getSequencePropretyBeanMgr(); - Transaction preInvocationTransaction = storageManager.getTransaction(); + transaction = storageManager.getTransaction(); //Getting the incomingSequenceIdList SequencePropertyBean allSequencesBean = (SequencePropertyBean) sequencePropMgr .retrieve( Sandesha2Constants.SequenceProperties.ALL_SEQUENCES, Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST); - if (allSequencesBean == null) - continue; - - ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean - .getValue()); - preInvocationTransaction.commit(); + if (allSequencesBean == null) { + continue; + } + ArrayList allSequencesList = SandeshaUtil.getArrayListFromString (allSequencesBean.getValue()); Iterator allSequencesItr = allSequencesList.iterator(); - + currentIteration: while (allSequencesItr.hasNext()) { - String sequenceId = (String) allSequencesItr.next(); - Transaction invocationTransaction = storageManager.getTransaction(); //Transaction based invocation + //commiting the old transaction + transaction.commit(); + + //starting a new transaction for the new iteration. + transaction = storageManager.getTransaction(); NextMsgBean nextMsgBean = nextMsgMgr.retrieve(sequenceId); if (nextMsgBean == null) { - String message = "Next message not set correctly. Removing invalid entry."; log.debug(message); allSequencesItr.remove(); @@ -144,14 +145,12 @@ //cleaning the invalid data of the all sequences. allSequencesBean.setValue(allSequencesList.toString()); sequencePropMgr.update(allSequencesBean); - - throw new SandeshaException (message); + continue; } long nextMsgno = nextMsgBean.getNextMsgNoToProcess(); if (nextMsgno <= 0) { - String message = "Invalid messaage number as the Next Message Number. Removing invalid entry"; - + String message = "Invalid message number as the Next Message Number."; throw new SandeshaException(message); } @@ -163,57 +162,36 @@ while (stMapIt.hasNext()) { - InvokerBean stMapBean = (InvokerBean) stMapIt - .next(); + InvokerBean stMapBean = (InvokerBean) stMapIt.next(); String key = stMapBean.getMessageContextRefKey(); - MessageContext msgToInvoke = storageManager.retrieveMessageContext(key,context); + RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke); - RMMsgContext rmMsg = MsgInitializer - .initializeMessage(msgToInvoke); - Sequence seq = (Sequence) rmMsg - .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); - - long msgNo = seq.getMessageNumber().getMessageNumber(); - + //have to commit the transaction before invoking. This may get changed when WS-AT is available. + transaction.commit(); + try { - //Invoking the message. - - //currently Transaction based invocation can be supplied only for the in-only case. - - if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) { - invocationTransaction.commit(); - } - + //Invoking the message. + msgToInvoke.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE); new AxisEngine (msgToInvoke.getConfigurationContext()) .resume(msgToInvoke); invoked = true; - - if (!AxisOperationFactory.MEP_URI_IN_ONLY.equals(msgToInvoke.getAxisOperation().getMessageExchangePattern())) { - invocationTransaction = storageManager.getTransaction(); - } - storageMapMgr.delete(key); } catch (AxisFault e) { throw new SandeshaException(e); + } finally { + transaction = storageManager.getTransaction(); } - + //undating the next msg to invoke - if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) { Sequence sequence = (Sequence) rmMsg .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); if (sequence.getLastMessage() != null) { - TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId); - - //this sequence has no more invocations -// stopInvokerForTheSequence(sequenceId); - //exit from current iteration. (since an entry was removed) - invocationTransaction.commit(); break currentIteration; } } @@ -223,15 +201,19 @@ nextMsgno++; nextMsgBean.setNextMsgNoToProcess(nextMsgno); nextMsgMgr.update(nextMsgBean); - invocationTransaction.commit(); - } + } } - } catch (SandeshaException e1) { + } catch (Exception e1) { e1.printStackTrace(); + if (transaction!=null) { + transaction.rollback(); + rolebacked = true; + } + } finally { + if (!rolebacked && transaction!=null) + transaction.commit(); } } - - int i = 1; } } Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=405839&r1=405838&r2=405839&view=diff ============================================================================== --- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original) +++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Fri May 12 12:16:32 2006 @@ -58,11 +58,9 @@ public class Sender extends Thread { private boolean runSender = false; - private boolean stopSenderAfterWork = false; private ArrayList workingSequences = new ArrayList(); private ConfigurationContext context = null; private static final Log log = LogFactory.getLog(Sender.class); - private ThreadPool threadPool = new ThreadPool (); public synchronized void stopSenderForTheSequence(String sequenceID) { workingSequences.remove(sequenceID); @@ -103,29 +101,29 @@ log.debug("End printing Interrupt..."); } + Transaction transaction = null; + boolean rolebacked = false; + try { if (context == null) { String message = "Can't continue the Sender. Context is null"; log.debug(message); throw new SandeshaException(message); } - - Transaction pickMessagesToSendTransaction = storageManager.getTransaction(); - + + transaction = storageManager.getTransaction(); + SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr(); SenderBean senderBean = mgr.getNextMsgToSend(); if (senderBean==null) { - pickMessagesToSendTransaction.commit(); continue; } - MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster(); boolean continueSending = retransmitterAdjuster.adjustRetransmittion(senderBean, context); - if (!continueSending) + if (!continueSending) { continue; - - pickMessagesToSendTransaction.commit(); + } String key = (String) senderBean.getMessageContextRefKey(); MessageContext msgCtx = storageManager.retrieveMessageContext(key, context); @@ -161,8 +159,6 @@ updateMessage(msgCtx); - Transaction preSendTransaction = storageManager.getTransaction(); - int messageType = rmMsgCtx.getMessageType(); if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) { Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); @@ -177,16 +173,16 @@ AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx); } - preSendTransaction.commit(); - //sending the message TransportOutDescription transportOutDescription = msgCtx.getTransportOut(); TransportSender transportSender = transportOutDescription.getSender(); + //have to commit the transaction before sending. This may get changed when WS-AT is available. + transaction.commit(); + boolean successfullySent = false; if (transportSender != null) { try { - //TODO change this to cater for security. transportSender.invoke(msgCtx); successfullySent = true; @@ -194,11 +190,11 @@ // TODO Auto-generated catch block log.debug("Could not send message"); log.debug(e.getStackTrace().toString()); + } finally { + transaction = storageManager.getTransaction(); } } - Transaction postSendTransaction = storageManager.getTransaction(); - // update or delete only if the object is still present. SenderBean bean1 = mgr.retrieve(senderBean.getMessageID()); if (bean1 != null) { @@ -210,14 +206,13 @@ mgr.delete(bean1.getMessageID()); } - postSendTransaction.commit(); // commiting the current transaction - + msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_TRUE); + if (successfullySent) { if (!msgCtx.isServerSide()) checkForSyncResponses(msgCtx); } - - Transaction terminateCleaningTransaction = storageManager.getTransaction(); + if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) { // terminate sending side. TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ); @@ -228,15 +223,22 @@ TerminateManager.terminateSendingSide(configContext,internalSequenceID, msgCtx.isServerSide()); } - terminateCleaningTransaction.commit(); + msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,Sandesha2Constants.VALUE_FALSE); - } catch (AxisFault e) { + } catch (Exception e) { String message = "An Exception was throws in sending"; log.debug(message,e); // TODO : when this is the client side throw the exception to // the client when necessary. + if (transaction!=null) { + transaction.rollback(); + rolebacked = true; + } + } finally { + if (transaction!=null && !rolebacked) + transaction.commit(); } } } @@ -311,7 +313,12 @@ log.debug("Valid SOAP envelope not found"); log.debug(e.getStackTrace().toString()); } - + + //if the request msg ctx is withina a transaction, processing if the response should also happen + //withing the same transaction + responseMessageContext.setProperty(Sandesha2Constants.WITHIN_TRANSACTION + ,msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION)); + if (resenvelope != null) { responseMessageContext.setEnvelope(resenvelope); AxisEngine engine = new AxisEngine(msgCtx --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
