Author: chamikara
Date: Sun Oct 23 19:58:16 2005
New Revision: 327930
URL: http://svn.apache.org/viewcvs?rev=327930&view=rev
Log:
A bug fix - create sequnece operation description has to be InOut.
Refactored code.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/FaultMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/FaultMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/FaultMgr.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/FaultMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/FaultMgr.java Sun Oct
23 19:58:16 2005
@@ -36,12 +36,10 @@
import org.apache.sandesha2.wsrm.SequenceFault;
/**
- * Created by IntelliJ IDEA.
- * User: sanka
- * Date: Oct 9, 2005
- * Time: 11:10:07 PM
- * To change this template use File | Settings | File Templates.
+ * @author Sanka
+ * @author Chamikara
*/
+
public class FaultMgr {
private static final String WSA_ACTION_FAULT
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgCreator.java Sun
Oct 23 19:58:16 2005
@@ -28,6 +28,8 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.OperationDescription;
+import org.apache.axis2.description.OperationDescriptionFactory;
import org.apache.axis2.om.OMAbstractFactory;
import org.apache.axis2.om.impl.MIMEOutputUtils;
import org.apache.axis2.soap.SOAPEnvelope;
@@ -55,6 +57,7 @@
import org.apache.sandesha2.wsrm.TerminateSequence;
import org.apache.wsdl.WSDLConstants;
+
/**
* @author Chamikara
* @author Sanka
@@ -97,9 +100,17 @@
String createSeqMsgId = SandeshaUtil.getUUID();
try {
+ OperationDescription appMsgOperationDesc =
applicationMsgContext.getOperationDescription();
+ OperationDescription createSeqOperationDesc =
OperationDescriptionFactory.getOperetionDescription(OperationDescriptionFactory.MEP_CONSTANT_OUT_IN);
+
createSeqOperationDesc.setPhasesOutFlow(appMsgOperationDesc.getPhasesOutFlow());
+
createSeqOperationDesc.setPhasesOutFaultFlow(appMsgOperationDesc.getPhasesOutFaultFlow());
+
createSeqOperationDesc.setPhasesInFaultFlow(appMsgOperationDesc.getPhasesInFaultFlow());
+
createSeqOperationDesc.setRemainingPhasesInFlow(appMsgOperationDesc.getRemainingPhasesInFlow());
+
+
createSeqmsgContext.setOperationDescription(createSeqOperationDesc);
//TODO set a suitable ope. description
OperationContext createSeqOpContext = new
OperationContext(
-
applicationMsgContext.getOperationDescription());
+
createSeqmsgContext.getOperationDescription());
createSeqmsgContext.setOperationContext(createSeqOpContext);
createSeqOpContext.addMessageContext(createSeqmsgContext);
//registering opearion context
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaException.java
Sun Oct 23 19:58:16 2005
@@ -17,6 +17,10 @@
package org.apache.sandesha2;
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
//FIXME - extends AxisFault and clean code (remove unnecessary try-catches )
public class SandeshaException extends Exception {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java Sun Oct 23
19:58:16 2005
@@ -49,6 +49,11 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.Sequence;
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
+
public class Sender extends Thread {
private boolean senderStarted = false;
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Sun Oct 23 19:58:16 2005
@@ -51,24 +51,15 @@
import org.apache.sandesha2.wsrm.SequenceOffer;
import org.apache.wsdl.WSDLConstants;
-public class SandeshaOutHandler extends AbstractHandler {
-
- public static final Object key = new Object();
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
- public static void waitOnKey() throws InterruptedException {
- synchronized (key) {
- key.wait();
- }
- }
-
- public static void notifyAllWaitingOnKey() {
- synchronized (key) {
- key.notifyAll();
- }
- }
+public class SandeshaOutHandler extends AbstractHandler {
public void invoke(MessageContext msgCtx) throws AxisFault {
-
+
String DONE = (String) msgCtx
.getProperty(Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE))
@@ -100,6 +91,11 @@
.getInstance(context).getSequencePropretyBeanMgr();
boolean serverSide = msgCtx.isServerSide();
+
+ //setting message Id if null
+ if (msgCtx.getMessageID()==null){
+ msgCtx.setMessageID(SandeshaUtil.getUUID());
+ }
//initial work
//find temp sequence id
String tempSequenceId = null;
@@ -143,12 +139,11 @@
"TO End Point Reference is not
set correctly. This is a must for the sandesha client side.");
tempSequenceId = toEPR.getAddress();
- String sequenceKey = (String)
context.getProperty(Constants.SEQUENCE_KEY);
- if (sequenceKey!=null)
+ String sequenceKey = (String) context
+ .getProperty(Constants.SEQUENCE_KEY);
+ if (sequenceKey != null)
tempSequenceId = tempSequenceId + sequenceKey;
-
-
-
+
}
//check if the fist message
@@ -156,14 +151,14 @@
long messageNumber = getNextMsgNo(context, tempSequenceId);
boolean sendCreateSequence = false;
-
- SequencePropertyBean outSeqBean =
seqPropMgr.retrieve(tempSequenceId,Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
- if ((messageNumber==1) && (outSeqBean==null)) {
+
+ SequencePropertyBean outSeqBean =
seqPropMgr.retrieve(tempSequenceId,
+ Constants.SequenceProperties.OUT_SEQUENCE_ID);
+
+ if ((messageNumber == 1) && (outSeqBean == null)) {
sendCreateSequence = true;
}
-
//if fist message - setup the sequence for the client side
if (!serverSide && sendCreateSequence) {
try {
@@ -188,8 +183,9 @@
seqPropMgr.insert(responseCreateSeqAdded);
try {
- String acksTo = (String)
context.getProperty(Constants.AcksTo);
- addCreateSequenceMessage(rmMsgCtx,
tempSequenceId,acksTo);
+ String acksTo = (String) context
+
.getProperty(Constants.AcksTo);
+ addCreateSequenceMessage(rmMsgCtx,
tempSequenceId, acksTo);
} catch (SandeshaException e1) {
throw new AxisFault(e1.getMessage());
}
@@ -224,12 +220,12 @@
//Changing message Id.
//TODO remove this when Axis2 start sending
uuids as uuid:xxxx
String messageId1 = SandeshaUtil.getUUID();
- if (rmMsgCtx.getMessageId()==null) {
+ if (rmMsgCtx.getMessageId() == null) {
rmMsgCtx.setMessageId(messageId1);
}
//OperationContext opCtx =
msgCtx.getOperationContext();
-//
msgCtx.getSystemContext().registerOperationContext(messageId,
-// opCtx);
+ //
msgCtx.getSystemContext().registerOperationContext(messageId,
+ //
opCtx);
if (serverSide) {
@@ -281,8 +277,6 @@
reqMsgCtx.getOperationContext().setProperty(
org.apache.axis2.Constants.RESPONSE_WRITTEN,
"false");
- msgCtx.setPausedTrue(getName());
-
} else {
//setting reply to FIXME
@@ -298,67 +292,21 @@
throw new
SandeshaException("To EPR is not found");
String to = toEPR.getAddress();
- String operationName =
msgCtx.getOperationContext().getOperationDescription().getName().getLocalPart();
+ String operationName =
msgCtx.getOperationContext()
+
.getOperationDescription().getName()
+ .getLocalPart();
msgCtx.setWSAAction(to + "/" +
operationName);
}
//processing the response
processResponseMessage(rmMsgCtx,
tempSequenceId,
messageNumber);
-
- //pausing the message
- msgCtx.setPausedTrue(getName());
-
-// //Getting the mep.
-// String mep =
msgCtx.getOperationDescription()
-//
.getMessageExchangePattern();
-//
-// if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
-// //Add a sequence property to
check weather the response
-// // has arrived.
-// SequencePropertyBean
checkResponseBean = new SequencePropertyBean();
-//
checkResponseBean.setSequenceId(msgCtx.getMessageID());
-// checkResponseBean
-//
.setName(Constants.SequenceProperties.CHECK_RESPONSE);
-//
checkResponseBean.setValue(null);
-//
seqPropMgr.insert(checkResponseBean);
-// }
-//
-// //client side wait
-// boolean letGo = false;
-// while (!letGo) {
-// if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
-// //if the mep is in-out
them wait till the response
-// // comes. then pause.
-// SequencePropertyBean
checkResponseBean = seqPropMgr
-//
.retrieve(
-//
msgCtx.getMessageID(),
-//
Constants.SequenceProperties.CHECK_RESPONSE);
-// MessageContext response
= (MessageContext) checkResponseBean.getValue();
-// if
(null!=checkResponseBean.getValue()) {
-// //simply return
to the caller.
-//
//msgCtx.setConfigurationContext( properteies)
-//
//msgCtx.setTransportIn(response.getTransportIn());
-//
msgCtx.setProperty(MessageContext.TRANSPORT_IN,response.getProperty(MessageContext.TRANSPORT_IN));
-//
//msgCtx.setProperty(org.apache.axis2.Constants.tra)
-//
msgCtx.setPausedTrue(getName());
-// letGo = true;
-// continue;
-// }
-// } else {
-// //FIXME - non-inout
case.
-// //TODO check for the
ack and pause.
-// }
-// try {
-// waitOnKey();
-//
-// } catch
(InterruptedException e1) {
-//
System.out.println("Client was interupted...");
-// }
-//
-// }
+
}
+
+ //pausing the message
+ msgCtx.setPausedTrue(getName());
}
} catch (SandeshaException e) {
@@ -373,30 +321,35 @@
if (applicationMsg == null)
throw new SandeshaException("Message context is null");
RMMsgContext createSeqRMMessage =
RMMsgCreator.createCreateSeqMsg(
- applicationRMMsg, tempSequenceId,acksTo);
- CreateSequence createSequencePart = (CreateSequence)
createSeqRMMessage.getMessagePart(Constants.MessageParts.CREATE_SEQ);
- if (createSequencePart==null)
- throw new SandeshaException ("Create Sequence part is
null for a CreateSequence message");
-
+ applicationRMMsg, tempSequenceId, acksTo);
+ CreateSequence createSequencePart = (CreateSequence)
createSeqRMMessage
+
.getMessagePart(Constants.MessageParts.CREATE_SEQ);
+ if (createSequencePart == null)
+ throw new SandeshaException(
+ "Create Sequence part is null for a
CreateSequence message");
+
SequenceOffer offer = createSequencePart.getSequenceOffer();
- if (offer!=null) {
+ if (offer != null) {
//Offer processing
String offeredSequenceId =
offer.getIdentifer().getIdentifier();
- SequencePropertyBean msgsBean = new
SequencePropertyBean ();
+ SequencePropertyBean msgsBean = new
SequencePropertyBean();
msgsBean.setSequenceId(offeredSequenceId);
msgsBean.setName(Constants.SequenceProperties.RECEIVED_MESSAGES);
msgsBean.setValue("");
-
- SequencePropertyBean offeredSequenceBean = new
SequencePropertyBean ();
-
offeredSequenceBean.setName(Constants.SequenceProperties.OFFERED_SEQUENCE);
+
+ SequencePropertyBean offeredSequenceBean = new
SequencePropertyBean();
+ offeredSequenceBean
+
.setName(Constants.SequenceProperties.OFFERED_SEQUENCE);
offeredSequenceBean.setSequenceId(tempSequenceId);
offeredSequenceBean.setValue(offeredSequenceId);
-
- SequencePropertyBeanMgr seqPropMgr =
AbstractBeanMgrFactory.getInstance(applicationMsg.getSystemContext()).getSequencePropretyBeanMgr();
+
+ SequencePropertyBeanMgr seqPropMgr =
AbstractBeanMgrFactory
+
.getInstance(applicationMsg.getSystemContext())
+ .getSequencePropretyBeanMgr();
seqPropMgr.insert(msgsBean);
seqPropMgr.insert(offeredSequenceBean);
}
-
+
MessageContext createSeqMsg =
createSeqRMMessage.getMessageContext();
//TODO remove below
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Sun Oct 23 19:58:16 2005
@@ -48,6 +48,11 @@
import org.apache.sandesha2.wsrm.Nack;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
+
public class AcknowledgementProcessor implements MsgProcessor {
// public static void notifyAllWaitingOnKey () {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Sun Oct 23 19:58:16 2005
@@ -58,22 +58,24 @@
import org.ietf.jgss.MessageProp;
/**
- * @author
+ * @author Chamikara
+ * @author Sanka
*/
+
public class ApplicationMsgProcessor implements MsgProcessor {
private boolean letInvoke = false;
public void processMessage(RMMsgContext rmMsgCtx) throws
SandeshaException {
-
//Processing for ack if any
- SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement)
rmMsgCtx.getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
- if (sequenceAck!=null) {
- AcknowledgementProcessor ackProcessor = new
AcknowledgementProcessor ();
+ SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement)
rmMsgCtx
+
.getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ if (sequenceAck != null) {
+ AcknowledgementProcessor ackProcessor = new
AcknowledgementProcessor();
ackProcessor.processMessage(rmMsgCtx);
}
-
+
//Processing the application message.
MessageContext msgCtx = rmMsgCtx.getMessageContext();
if (msgCtx == null)
@@ -85,9 +87,10 @@
return;
}
- SequencePropertyBeanMgr seqPropMgr =
AbstractBeanMgrFactory.getInstance(rmMsgCtx.getContext()).getSequencePropretyBeanMgr();
-
-
+ SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
+ .getInstance(rmMsgCtx.getContext())
+ .getSequencePropretyBeanMgr();
+
//setting acked msg no range
Sequence sequence = (Sequence) rmMsgCtx
.getMessagePart(Constants.MessageParts.SEQUENCE);
@@ -95,12 +98,11 @@
ConfigurationContext configCtx = rmMsgCtx.getMessageContext()
.getSystemContext();
if (configCtx == null)
- throw new SandeshaException("Configuration Context is
null");
+ throw new SandeshaException("Configuration Context is
null");
SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
Constants.SequenceProperties.RECEIVED_MESSAGES);
-
long msgNo = sequence.getMessageNumber().getMessageNumber();
if (msgNo == 0)
throw new SandeshaException("Wrong message number");
@@ -117,7 +119,7 @@
//TODO is this enough
msgCtx.setPausedTrue(new
QName(Constants.IN_HANDLER_NAME));
-
+
}
if (messagesStr != "" && messagesStr != null)
@@ -142,56 +144,54 @@
//if (acksToStr.equals(Constants.WSA.NS_URI_ANONYMOUS)) {
- RMMsgContext ackRMMsgCtx =
SandeshaUtil.deepCopy(rmMsgCtx);
- MessageContext ackMsgCtx =
ackRMMsgCtx.getMessageContext();
-
ackMsgCtx.setServiceGroupContext(msgCtx.getServiceGroupContext());
- ackMsgCtx.setServiceGroupContextId(msgCtx
- .getServiceGroupContextId());
- ackMsgCtx.setServiceContext(msgCtx.getServiceContext());
-
ackMsgCtx.setServiceContextID(msgCtx.getServiceContextID());
-
- //TODO set a suitable operation description
- OperationContext ackOpContext = new
OperationContext(msgCtx
- .getOperationDescription());
-
- try {
- ackOpContext.addMessageContext(ackMsgCtx);
- } catch (AxisFault e2) {
- throw new SandeshaException(e2.getMessage());
- }
- ackMsgCtx.setOperationContext(ackOpContext);
+ RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
+ MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+
ackMsgCtx.setServiceGroupContext(msgCtx.getServiceGroupContext());
+
ackMsgCtx.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
+ ackMsgCtx.setServiceContext(msgCtx.getServiceContext());
+ ackMsgCtx.setServiceContextID(msgCtx.getServiceContextID());
+
+ //TODO set a suitable operation description
+ OperationContext ackOpContext = new OperationContext(msgCtx
+ .getOperationDescription());
+
+ try {
+ ackOpContext.addMessageContext(ackMsgCtx);
+ } catch (AxisFault e2) {
+ throw new SandeshaException(e2.getMessage());
+ }
+ ackMsgCtx.setOperationContext(ackOpContext);
- //Set new envelope
- SOAPEnvelope envelope =
SOAPAbstractFactory.getSOAPFactory(
-
Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
- try {
- ackMsgCtx.setEnvelope(envelope);
- } catch (AxisFault e3) {
- throw new SandeshaException(e3.getMessage());
- }
+ //Set new envelope
+ SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
+
Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
- //FIXME set acksTo instead of ReplyTo
- ackMsgCtx.setTo(acksTo);
- ackMsgCtx.setReplyTo(msgCtx.getTo());
- RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
-
- AxisEngine engine = new
AxisEngine(ackRMMsgCtx.getMessageContext()
- .getSystemContext());
-
- //set CONTEXT_WRITTEN since acksto is anonymous
-
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-
org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
-
rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN,
- "true");
- try {
- engine.send(ackRMMsgCtx.getMessageContext());
- } catch (AxisFault e1) {
- throw new SandeshaException(e1.getMessage());
- }
+ //FIXME set acksTo instead of ReplyTo
+ ackMsgCtx.setTo(acksTo);
+ ackMsgCtx.setReplyTo(msgCtx.getTo());
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
+
+ AxisEngine engine = new
AxisEngine(ackRMMsgCtx.getMessageContext()
+ .getSystemContext());
+
+ //set CONTEXT_WRITTEN since acksto is anonymous
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN,
"true");
+ rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN,
"true");
+ try {
+ engine.send(ackRMMsgCtx.getMessageContext());
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1.getMessage());
+ }
-// } else {
-// //TODO Add async Ack
-// }
+ // } else {
+ // //TODO Add async Ack
+ // }
// Pause the messages bean if not the right
message to invoke.
NextMsgBeanMgr mgr =
AbstractBeanMgrFactory.getInstance(configCtx)
@@ -206,87 +206,87 @@
long nextMsgno = bean.getNextMsgNoToProcess();
-
//FIXME - fix delivery assurances for the client side
if (msgCtx.isServerSide()) {
- if (Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE
== Constants.QOS.DeliveryAssurance.IN_ORDER) {
- //pause the message
- msgCtx.setPausedTrue(new
QName(Constants.IN_HANDLER_NAME));
-
- //Adding an entry in the SequencesToInvoke List TODO -
add this to
- // a module init kind of place.
- SequencePropertyBean incomingSequenceListBean =
(SequencePropertyBean) seqPropMgr
-
.retrieve(Constants.SequenceProperties.ALL_SEQUENCES,
-
Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- ArrayList incomingSequenceList = new
ArrayList();
- incomingSequenceListBean = new
SequencePropertyBean();
- incomingSequenceListBean
-
.setSequenceId(Constants.SequenceProperties.ALL_SEQUENCES);
- incomingSequenceListBean
-
.setName(Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
incomingSequenceListBean.setValue(incomingSequenceList);
+ if
(Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE ==
Constants.QOS.DeliveryAssurance.IN_ORDER) {
+ //pause the message
+ msgCtx.setPausedTrue(new
QName(Constants.IN_HANDLER_NAME));
+
+ //Adding an entry in the SequencesToInvoke List
TODO - add this
+ // to
+ // a module init kind of place.
+ SequencePropertyBean incomingSequenceListBean =
(SequencePropertyBean) seqPropMgr
+ .retrieve(
+
Constants.SequenceProperties.ALL_SEQUENCES,
+
Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ ArrayList incomingSequenceList = new
ArrayList();
+ incomingSequenceListBean = new
SequencePropertyBean();
+ incomingSequenceListBean
+
.setSequenceId(Constants.SequenceProperties.ALL_SEQUENCES);
+ incomingSequenceListBean
+
.setName(Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
incomingSequenceListBean.setValue(incomingSequenceList);
- seqPropMgr.insert(incomingSequenceListBean);
- }
+
seqPropMgr.insert(incomingSequenceListBean);
+ }
- //This must be a List :D
- ArrayList incomingSequenceList = (ArrayList)
incomingSequenceListBean
- .getValue();
-
- //Adding current sequence to the incoming sequence List.
- if (!incomingSequenceList.contains(sequenceId)) {
- incomingSequenceList.add(sequenceId);
- }
+ //This must be a List :D
+ ArrayList incomingSequenceList = (ArrayList)
incomingSequenceListBean
+ .getValue();
+
+ //Adding current sequence to the incoming
sequence List.
+ if (!incomingSequenceList.contains(sequenceId))
{
+ incomingSequenceList.add(sequenceId);
+ }
- //saving the message.
- try {
- String key =
SandeshaUtil.storeMessageContext(rmMsgCtx
- .getMessageContext());
- storageMapMgr
- .insert(new StorageMapBean(key,
msgNo, sequenceId));
-
- //This will avoid performing application
processing more than
- // once.
-
rmMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,
- "true");
+ //saving the message.
+ try {
+ String key =
SandeshaUtil.storeMessageContext(rmMsgCtx
+ .getMessageContext());
+ storageMapMgr.insert(new
StorageMapBean(key, msgNo,
+ sequenceId));
+
+ //This will avoid performing
application processing more
+ // than
+ // once.
+
rmMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,
+ "true");
- } catch (Exception ex) {
- throw new SandeshaException(ex.getMessage());
- }
+ } catch (Exception ex) {
+ throw new
SandeshaException(ex.getMessage());
+ }
- //Starting the invoker if stopped.
-
SandeshaUtil.startInvokerIfStopped(msgCtx.getSystemContext());
+ //Starting the invoker if stopped.
+
SandeshaUtil.startInvokerIfStopped(msgCtx.getSystemContext());
+ }
}
- }
-
+
//client side - SET CheckResponse to true.
- //FIXME this will not work. Even in client side inServerSide ()
is true for the messages.
- //if (!msgCtx.isServerSide()) {
- try {
- MessageContext requestMessage =
rmMsgCtx.getMessageContext().getOperationContext().getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- String requestMessageId =
requestMessage.getMessageID();
- SequencePropertyBean checkResponseBean =
seqPropMgr.retrieve(requestMessageId,Constants.SequenceProperties.CHECK_RESPONSE);
- if (checkResponseBean!=null) {
- checkResponseBean.setValue(msgCtx);
- seqPropMgr.update(checkResponseBean);
- }
-
- } catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
+ //FIXME this will not work. Even in client side inServerSide ()
is true
+ // for the messages.
+ try {
+ MessageContext requestMessage =
rmMsgCtx.getMessageContext()
+
.getOperationContext().getMessageContext(
+
WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ String requestMessageId = requestMessage.getMessageID();
+ SequencePropertyBean checkResponseBean =
seqPropMgr.retrieve(
+ requestMessageId,
+
Constants.SequenceProperties.CHECK_RESPONSE);
+ if (checkResponseBean != null) {
+ checkResponseBean.setValue(msgCtx);
+ seqPropMgr.update(checkResponseBean);
}
-
-
- //SET THe RESPONSE
-
- //WAKE UP THE SLEEPING THREADS
- //SandeshaOutHandler.notifyAllWaitingOnKey();
-
- //set
- //}
+
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ //SET THe RESPONSE
+
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Sun Oct 23 19:58:16 2005
@@ -43,7 +43,8 @@
import org.apache.sandesha2.wsrm.SequenceOffer;
/**
- * @author
+ * @author Chamikara
+ * @author Sanka
*/
public class CreateSeqMsgProcessor implements MsgProcessor {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Sun Oct 23 19:58:16 2005
@@ -51,6 +51,11 @@
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
+
public class CreateSeqResponseMsgProcessor implements MsgProcessor {
public void processMessage(RMMsgContext createSeqResponseRMMsgCtx)
throws SandeshaException {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
Sun Oct 23 19:58:16 2005
@@ -22,7 +22,8 @@
/**
- * @author
+ * @author Chamikara
+ * @author Sanka
*/
public interface MsgProcessor {
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Sun Oct 23 19:58:16 2005
@@ -26,9 +26,10 @@
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
/**
- * @author
- *
+ * @author Chamikara
+ * @author Sanka
*/
+
public class TerminateSeqMsgProcessor implements MsgProcessor {
public void processMessage(RMMsgContext terminateSeqRMMSg)
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java?rev=327930&r1=327929&r2=327930&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
Sun Oct 23 19:58:16 2005
@@ -25,6 +25,7 @@
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.MsgInitializer;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
@@ -55,8 +56,7 @@
public boolean insert(RetransmitterBean bean) throws SandeshaException {
if (bean.getMessageId() == null)
- throw new SandeshaException("Key (MessageId) is null.
Cant insert.");
-
+ throw new SandeshaException("Key (MessageId) is null.
Cant insert.");
table.put(bean.getMessageId(), bean);
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]