Author: chamikara
Date: Mon Oct 17 22:27:49 2005
New Revision: 326034
URL: http://svn.apache.org/viewcvs?rev=326034&view=rev
Log:
Client side is working for a single message.
Sandesha recommends the users to use call.invokeNonBlocking with
useSeperateTransport=true for in-out calls.
If they use invokeBlocking it may time out quickly (defined by
InOutMepClient.timeOutInMilliSeconds). Usually RM interactions take some time.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Oct
17 22:27:49 2005
@@ -184,7 +184,7 @@
String NOT_IN_ORDER = "NotInOrder";
- String DEFAULT_DELIVERY_ASSURANCE = IN_ORDER;
+ String DEFAULT_DELIVERY_ASSURANCE = NOT_IN_ORDER;
}
public interface InvocationType {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java Mon Oct 17
22:27:49 2005
@@ -36,6 +36,9 @@
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.OperationDescription;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.soap.SOAPEnvelope;
+import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.sandesha2.client.SandeshaMepClient;
import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
@@ -82,57 +85,13 @@
MessageContext msgCtx = SandeshaUtil
.getStoredMessageContext(key);
-
try {
- RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
+ RMMsgContext rmMsgCtx = MsgInitializer
+
.initializeMessage(msgCtx);
updateMessage(msgCtx);
- if (msgCtx.isServerSide())
- new
AxisEngine(context).send(msgCtx);
- else {
-
-//
//TwoWayTransportBasedSender.send(msgCtx, msgCtx.getTransportIn());
-//
-// //boolean invokeBlocking =
isInvocationTypeBlocking (rmMsgCtx);
-//
-// //if
(msgCtx.getOperationDescription().getMessageExchangePattern()==req-res)
-// //{
-// InOutMEPClient inoutClient =
new InOutMEPClient (msgCtx.getServiceContext());
-// Call call = new Call ();
-// call.in
-//
inoutClient.setTransportInfo(msgCtx.get);
-// if (invokeBlocking){
-//
inoutClient.invokeBlocking(msgCtx.getOperationDescription(),msgCtx);
-// }else {
-//
inoutClient.invokeNonBlocking(msgCtx.getOperationDescription(),msgCtx,new
SandeshaCallback ());
-// }
-// //}
-
-
- boolean responseExpected =
isResponseExpected (rmMsgCtx);
-
- if (responseExpected){
- //Call inOutMepClient =
new Call (msgCtx.getServiceContext());
-
//inOutMepClient.setTo(msgCtx.getTo());
-
- //this will start the
listner.
-
- SandeshaMepClient
inOutMepClient = new SandeshaMepClient (msgCtx.getServiceContext());
-
//inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,true);
-
inOutMepClient.setTo(msgCtx.getTo());
-
inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,true);
-
inOutMepClient.invokeDual(msgCtx.getOperationDescription(),msgCtx);
-
//inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,false);
-
//call.invokeBlocking(msgCtx.getOperationDescription(),msgCtx);
- }else {
- MessageSender sender =
new MessageSender ();
-
sender.setTo(msgCtx.getTo());
-
sender.send(msgCtx.getOperationDescription(),msgCtx);
- }
-
-
-
- }
+ new AxisEngine(context).send(msgCtx);
+ checkForSyncResponses(msgCtx);
} catch (AxisFault e1) {
e1.printStackTrace();
@@ -163,22 +122,24 @@
}
- private boolean isResponseExpected (RMMsgContext rmMsgCtx) {
+ private boolean isResponseExpected(RMMsgContext rmMsgCtx) {
boolean responseExpected = false;
-
- if
(rmMsgCtx.getMessageType()==Constants.MessageTypes.CREATE_SEQ){
+
+ if (rmMsgCtx.getMessageType() ==
Constants.MessageTypes.CREATE_SEQ) {
responseExpected = true;
- }if
(rmMsgCtx.getMessageType()==Constants.MessageTypes.APPLICATION) {
+ }
+ if (rmMsgCtx.getMessageType() ==
Constants.MessageTypes.APPLICATION) {
//a ack may arrive. (not a application response)
- if
(rmMsgCtx.getMessageContext().getOperationDescription().getMessageExchangePattern().equals(
-
org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {
- responseExpected = true;
+ if
(rmMsgCtx.getMessageContext().getOperationDescription()
+ .getMessageExchangePattern().equals(
+
org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {
+ responseExpected = true;
}
}
-
+
return true;
}
-
+
public void start(ConfigurationContext context) {
senderStarted = true;
this.context = context;
@@ -189,14 +150,50 @@
try {
RMMsgContext rmMsgCtx1 =
MsgInitializer.initializeMessage(msgCtx1);
rmMsgCtx1.addSOAPEnvelope();
-
-
-
+
} catch (AxisFault e) {
throw new SandeshaException("Exception in updating
contexts");
}
-
-
+
+ }
+
+ private void checkForSyncResponses(MessageContext msgCtx) throws
AxisFault {
+
+ boolean responsePresent = (msgCtx
+ .getProperty(MessageContext.TRANSPORT_IN) !=
null);
+
+ if (responsePresent) {
+ //create the response
+ MessageContext response = new MessageContext(msgCtx
+ .getSystemContext(),
msgCtx.getSessionContext(), msgCtx
+ .getTransportIn(),
msgCtx.getTransportOut());
+ response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+
.getProperty(MessageContext.TRANSPORT_IN));
+
+ response.setServerSide(false);
+
+ //If request is REST we assume the response is REST, so
set the
+ // variable
+ response.setDoingREST(msgCtx.isDoingREST());
+
response.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
+
response.setServiceGroupContext(msgCtx.getServiceGroupContext());
+ response.setServiceContext(msgCtx.getServiceContext());
+
response.setServiceDescription(msgCtx.getServiceDescription());
+
response.setServiceGroupDescription(msgCtx.getServiceGroupDescription());
+
+ //Changed following from TransportUtils to SandeshaUtil
since op. context is anavailable.
+ SOAPEnvelope resenvelope =
SandeshaUtil.createSOAPMessage(
+ response,
msgCtx.getEnvelope().getNamespace().getName());
+
+ if (resenvelope != null) {
+ AxisEngine engine = new
AxisEngine(msgCtx.getSystemContext());
+ response.setEnvelope(resenvelope);
+ engine.receive(response);
+ } else {
+ throw new AxisFault(Messages
+
.getMessage("blockInvocationExpectsRes="));
+ }
+ }
}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
Mon Oct 17 22:27:49 2005
@@ -47,6 +47,7 @@
OperationContext newOperationContext = new
OperationContext (msgctx.getOperationDescription());
+ newOperationContext.setProperty("test","test123");
msgctx.getOperationDescription().registerOperationContext(response,
newOperationContext);
response.setServerSide(false);
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
Mon Oct 17 22:27:49 2005
@@ -59,6 +59,8 @@
} catch (SandeshaException ex) {
throw new AxisFault("Cant initialize the message");
}
+
+ System.out.println("Got message of type:" +
rmMsgCtx.getMessageType() + " MessageId:" + msgCtx.getMessageID());
// try {
// System.out.println("SandeshaInHandler Got a message of
type:" + rmMsgCtx.getMessageType());
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
Mon Oct 17 22:27:49 2005
@@ -51,6 +51,20 @@
public class SandeshaOutHandler extends AbstractHandler {
+ public static final Object key = new Object();
+
+ public static void waitOnKey() throws InterruptedException {
+ synchronized (key) {
+ key.wait();
+ }
+ }
+
+ public static void notifyAllWaitingOnKey() {
+ synchronized (key) {
+ key.notifyAll();
+ }
+ }
+
public void invoke(MessageContext msgCtx) throws AxisFault {
System.out.println("Sandesha out handler called");
@@ -59,8 +73,8 @@
if (null != DONE && "true".equals(DONE))
return;
-
msgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,"true");
-
+ msgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,
"true");
+
//getting rm message
RMMsgContext rmMsgCtx = null;
try {
@@ -205,14 +219,17 @@
//valid response
-
//Changing message Id.
//TODO remove this when Axis2 start sending
uuids as uuid:xxxx
- String messageId = SandeshaUtil.getUUID();
- rmMsgCtx.setMessageId(messageId);
- OperationContext opCtx =
msgCtx.getOperationContext();
-
msgCtx.getSystemContext().registerOperationContext(messageId, opCtx);
-
+ String messageId1 = SandeshaUtil.getUUID();
+ if (rmMsgCtx.getMessageId()==null) {
+ rmMsgCtx.setMessageId(messageId1);
+ System.out.println("Message id was
null");
+ }
+ //OperationContext opCtx =
msgCtx.getOperationContext();
+//
msgCtx.getSystemContext().registerOperationContext(messageId,
+// opCtx);
+
if (serverSide) {
//FIXME - do not copy application
messages. Coz u loose
@@ -242,8 +259,9 @@
newMsgCtx.setOperationContext(newOpContext);
//Thid does not have to be processed
again by RMHandlers
-
newMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,"true");
-
+ newMsgCtx.setProperty(
+
Constants.APPLICATION_PROCESSING_DONE, "true");
+
//processing the response
processResponseMessage(newRMMsgCtx,
tempSequenceId,
messageNumber);
@@ -267,8 +285,9 @@
} else {
//setting reply to FIXME
- //msgCtx.setReplyTo(new
EndpointReference ("http://localhost:9070/somethingWorking"));
-
+ //msgCtx.setReplyTo(new
EndpointReference
+ //
("http://localhost:9070/somethingWorking"));
+
//Setting WSA Action if null
//TODO: Recheck weather this action is
correct
if (msgCtx.getWSAAction() == null) {
@@ -282,55 +301,62 @@
.getAxisOperation().getName().getLocalPart();
msgCtx.setWSAAction(to + "/" +
operationName);
}
-
+
//processing the response
processResponseMessage(rmMsgCtx,
tempSequenceId,
messageNumber);
- //Getting the mep.
- String mep =
msgCtx.getOperationDescription()
-
.getMessageExchangePattern();
-
- if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
- //Add a sequence property to
check weather the response has arrived.
- SequencePropertyBean
checkResponseBean = new SequencePropertyBean ();
-
checkResponseBean.setSequenceId(msgCtx.getMessageID());
-
checkResponseBean.setName(Constants.SequenceProperties.CHECK_RESPONSE);
-
checkResponseBean.setValue("false");
-
seqPropMgr.insert(checkResponseBean);
- }
-
- //client side wait
- boolean letGo = false;
- while (!letGo) {
- if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
- //if the mep is in-out
them wait till the response comes. then pause.
- SequencePropertyBean
checkResponseBean =
seqPropMgr.retrieve(msgCtx.getMessageID(),Constants.SequenceProperties.CHECK_RESPONSE);
- String val = (String)
checkResponseBean.getValue();
- if
("true".equals(checkResponseBean.getValue())) {
-
msgCtx.setPausedTrue(getName());
- letGo = true;
- }
- } else {
- //FIXME - non-inout
case.
- //if not in-out simply
pause after the
- SequencePropertyBean
outSequenceBean = seqPropMgr
-
.retrieve(
-
tempSequenceId,
-
Constants.SequenceProperties.OUT_SEQUENCE_ID);
- if (outSequenceBean ==
null) {
- try {
-
//Thread.sleep(Constants.CLIENT_SLEEP_TIME);
- wait();
- } catch
(InterruptedException e1) {
-
System.out
-
.println("Client was interupted...");
- }
- } else {
- letGo = true;
- }
- }
- }
+ //pausing the message
+ msgCtx.setPausedTrue(getName());
+
+// //Getting the mep.
+// String mep =
msgCtx.getOperationDescription()
+//
.getMessageExchangePattern();
+//
+// if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
+// //Add a sequence property to
check weather the response
+// // has arrived.
+// SequencePropertyBean
checkResponseBean = new SequencePropertyBean();
+//
checkResponseBean.setSequenceId(msgCtx.getMessageID());
+// checkResponseBean
+//
.setName(Constants.SequenceProperties.CHECK_RESPONSE);
+//
checkResponseBean.setValue(null);
+//
seqPropMgr.insert(checkResponseBean);
+// }
+//
+// //client side wait
+// boolean letGo = false;
+// while (!letGo) {
+// if
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {
+// //if the mep is in-out
them wait till the response
+// // comes. then pause.
+// SequencePropertyBean
checkResponseBean = seqPropMgr
+//
.retrieve(
+//
msgCtx.getMessageID(),
+//
Constants.SequenceProperties.CHECK_RESPONSE);
+// MessageContext response
= (MessageContext) checkResponseBean.getValue();
+// if
(null!=checkResponseBean.getValue()) {
+// //simply return
to the caller.
+//
//msgCtx.setConfigurationContext( properteies)
+//
//msgCtx.setTransportIn(response.getTransportIn());
+//
msgCtx.setProperty(MessageContext.TRANSPORT_IN,response.getProperty(MessageContext.TRANSPORT_IN));
+//
//msgCtx.setProperty(org.apache.axis2.Constants.tra)
+//
msgCtx.setPausedTrue(getName());
+// letGo = true;
+// continue;
+// }
+// } else {
+// //FIXME - non-inout
case.
+// //TODO check for the
ack and pause.
+// }
+// try {
+// waitOnKey();
+//
+// } catch
(InterruptedException e1) {
+//
System.out.println("Client was interupted...");
+// }
+//
+// }
}
}
@@ -349,11 +375,11 @@
RMMsgContext createSeqRMMessage =
RMMsgCreator.createCreateSeqMsg(
applicationRMMsg, tempSequenceId);
MessageContext createSeqMsg =
createSeqRMMessage.getMessageContext();
-
-
+
//TODO remove below
- //createSeqMsg.setReplyTo(new EndpointReference
("http://localhost:9070/somethingWorking"));
-
+ //createSeqMsg.setReplyTo(new EndpointReference
+ // ("http://localhost:9070/somethingWorking"));
+
createSeqMsg.setRelatesTo(null); //create seq msg does not
relateTo
// anything
AbstractContext context = applicationRMMsg.getContext();
@@ -387,13 +413,6 @@
MessageContext msg = rmMsg.getMessageContext();
-// //Changing message Id.
-// //TODO remove this when Axis2 start sending uuids as uuid:xxxx
-// String messageId = SandeshaUtil.getUUID();
-// rmMsg.setMessageId(messageId);
-// OperationContext opCtx = msg.getOperationContext();
-// msg.getSystemContext().registerOperationContext(messageId,
opCtx);
-
if (rmMsg == null)
throw new SandeshaException("Message or reques message
is null");
@@ -480,8 +499,10 @@
} else {
//client side
- Object obj = msg.getProperty(Constants.LAST_MESSAGE);
- //if (obj != null && "true".equals(obj)) {
+
+ Object obj = msg.getSystemContext().getProperty(
+ Constants.LAST_MESSAGE);
+ if (obj != null && "true".equals(obj)) {
sequence.setLastMessage(new LastMessage());
//saving the last message no.
SequencePropertyBean lastOutMsgBean = new
SequencePropertyBean(
@@ -489,7 +510,7 @@
Constants.SequenceProperties.LAST_OUT_MESSAGE,
new Long(messageNumber));
sequencePropertyMgr.insert(lastOutMsgBean);
- //}
+ }
}
//setting the Sequnece id.
@@ -514,26 +535,26 @@
throw new SandeshaException(e1.getMessage());
}
-// //send the message through sender only in the server case.
-// //in the client case use the normal flow.
-// if (msg.isServerSide()) {
- //Retransmitter bean entry for the application message
- RetransmitterBean appMsgEntry = new RetransmitterBean();
- String key = SandeshaUtil.storeMessageContext(rmMsg
- .getMessageContext());
- appMsgEntry.setKey(key);
- appMsgEntry.setLastSentTime(0);
- appMsgEntry.setMessageId(rmMsg.getMessageId());
- appMsgEntry.setMessageNumber(messageNumber);
- if (outSequenceBean == null ||
outSequenceBean.getValue() == null) {
- appMsgEntry.setSend(false);
- } else {
- appMsgEntry.setSend(true);
+ // //send the message through sender only in the
server case.
+ // //in the client case use the normal flow.
+ // if (msg.isServerSide()) {
+ //Retransmitter bean entry for the application message
+ RetransmitterBean appMsgEntry = new RetransmitterBean();
+ String key = SandeshaUtil
+ .storeMessageContext(rmMsg.getMessageContext());
+ appMsgEntry.setKey(key);
+ appMsgEntry.setLastSentTime(0);
+ appMsgEntry.setMessageId(rmMsg.getMessageId());
+ appMsgEntry.setMessageNumber(messageNumber);
+ if (outSequenceBean == null || outSequenceBean.getValue() ==
null) {
+ appMsgEntry.setSend(false);
+ } else {
+ appMsgEntry.setSend(true);
- }
- appMsgEntry.setTempSequenceId(tempSequenceId);
- retransmitterMgr.insert(appMsgEntry);
-// }
+ }
+ appMsgEntry.setTempSequenceId(tempSequenceId);
+ retransmitterMgr.insert(appMsgEntry);
+ // }
}
private long getNextMsgNo(ConfigurationContext context,
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Mon Oct 17 22:27:49 2005
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
+import javax.xml.namespace.QName;
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
@@ -35,6 +36,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.RMMsgCreator;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.handlers.SandeshaOutHandler;
import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
@@ -48,6 +50,11 @@
public class AcknowledgementProcessor implements MsgProcessor {
+ public static void notifyAllWaitingOnKey () {
+
+ SandeshaOutHandler.key.notifyAll();
+ }
+
public void processMessage(RMMsgContext rmMsgCtx) throws
SandeshaException {
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement)
rmMsgCtx
@@ -61,6 +68,7 @@
Iterator ackRangeIterator =
sequenceAck.getAcknowledgementRanges()
.iterator();
+
Iterator nackIterator = sequenceAck.getNackList().iterator();
String outSequenceId =
sequenceAck.getIdentifier().getIdentifier();
if (outSequenceId == null || "".equals(outSequenceId))
@@ -71,8 +79,6 @@
SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
.getInstance(context).getSequencePropretyBeanMgr();
-
-
SequencePropertyBean tempSequenceBean = seqPropMgr.retrieve(
outSequenceId,
Constants.SequenceProperties.TEMP_SEQUENCE_ID);
@@ -140,6 +146,9 @@
}
int i = 1;
+
+ //stopping the progress of the message further.
+ rmMsgCtx.getMessageContext().setPausedTrue(new QName
(Constants.IN_HANDLER_NAME));
}
private RetransmitterBean getRetransmitterEntry(Collection collection,
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
---
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Mon Oct 17 22:27:49 2005
@@ -41,6 +41,7 @@
import org.apache.sandesha2.RMMsgCreator;
import org.apache.sandesha2.SOAPAbstractFactory;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.handlers.SandeshaOutHandler;
import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
@@ -202,7 +203,9 @@
long nextMsgno = bean.getNextMsgNoToProcess();
+
//FIXME - fix delivery assurances for the client side
+
if (msgCtx.isServerSide()) {
if (Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE
== Constants.QOS.DeliveryAssurance.IN_ORDER) {
//pause the message
@@ -258,13 +261,14 @@
}
//client side - SET CheckResponse to true.
- if (!msgCtx.isServerSide()) {
+ //FIXME this will not work. Even in client side inServerSide ()
is true for the messages.
+ //if (!msgCtx.isServerSide()) {
try {
MessageContext requestMessage =
rmMsgCtx.getMessageContext().getOperationContext().getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
String requestMessageId =
requestMessage.getMessageID();
SequencePropertyBean checkResponseBean =
seqPropMgr.retrieve(requestMessageId,Constants.SequenceProperties.CHECK_RESPONSE);
if (checkResponseBean!=null) {
- checkResponseBean.setValue("true");
+ checkResponseBean.setValue(msgCtx);
seqPropMgr.update(checkResponseBean);
}
@@ -272,8 +276,14 @@
throw new SandeshaException (e.getMessage());
}
+
+ //SET THe RESPONSE
+
+ //WAKE UP THE SLEEPING THREADS
+ //SandeshaOutHandler.notifyAllWaitingOnKey();
+
//set
- }
+ //}
}
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
Mon Oct 17 22:27:49 2005
@@ -17,6 +17,7 @@
package org.apache.sandesha2.util;
import java.awt.datatransfer.StringSelection;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -24,20 +25,31 @@
import java.util.Iterator;
import java.util.StringTokenizer;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamReader;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.MessageInformationHeaders;
import org.apache.axis2.addressing.miheaders.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.i18n.Messages;
import org.apache.axis2.om.OMElement;
import org.apache.axis2.om.impl.MIMEOutputUtils;
+import org.apache.axis2.om.impl.llom.builder.StAXBuilder;
+import org.apache.axis2.om.impl.llom.builder.StAXOMBuilder;
import org.apache.axis2.soap.SOAPEnvelope;
+import org.apache.axis2.soap.SOAPFactory;
+import org.apache.axis2.soap.impl.llom.builder.StAXSOAPModelBuilder;
+import org.apache.axis2.soap.impl.llom.soap11.SOAP11Factory;
import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
import org.apache.axis2.util.UUIDGenerator;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.sandesha2.Constants;
@@ -328,7 +340,57 @@
return false;
}
-// public SOAPEnvelope cloneSOAPEnvelope (SOAPEnvelope oldEnvelope) {
-//
-// }
+ public static SOAPEnvelope createSOAPMessage (MessageContext
msgContext, String soapNamespaceURI) throws AxisFault {
+ try {
+
+ InputStream inStream = (InputStream) msgContext.getProperty(
+ MessageContext.TRANSPORT_IN);
+ msgContext.setProperty(MessageContext.TRANSPORT_IN, null);
+ //this inputstram is set by the TransportSender represents a two
way transport or
+ //by a Transport Recevier
+ if (inStream == null) {
+ throw new AxisFault(Messages.getMessage("inputstreamNull"));
+ }
+
+ //This should be set later
+ //TODO check weather this affects MTOM
+ String contentType = null;
+
+ StAXBuilder builder = null;
+ SOAPEnvelope envelope = null;
+
+ String charSetEnc =
(String)msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING);
+ if(charSetEnc == null) {
+ charSetEnc = MessageContext.DEFAULT_CHAR_SET_ENCODING;
+ }
+
+ if (contentType != null) {
+ msgContext.setDoingMTOM(true);
+ builder =
+ HTTPTransportUtils.selectBuilderForMIME(msgContext,
+ inStream,
+ (String) contentType);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ } else if (msgContext.isDoingREST()) {
+ XMLStreamReader xmlreader =
+ XMLInputFactory.newInstance().createXMLStreamReader(
+ inStream,charSetEnc);
+ SOAPFactory soapFactory = new SOAP11Factory();
+ builder = new StAXOMBuilder(xmlreader);
+ builder.setOmbuilderFactory(soapFactory);
+ envelope = soapFactory.getDefaultEnvelope();
+ envelope.getBody().addChild(builder.getDocumentElement());
+ } else {
+ XMLStreamReader xmlreader =
+ XMLInputFactory.newInstance().createXMLStreamReader(
+ inStream,charSetEnc);
+ builder = new StAXSOAPModelBuilder(xmlreader,
soapNamespaceURI);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ }
+ return envelope;
+ } catch (Exception e) {
+ throw new AxisFault(e);
+ }
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]