Author: chamikara
Date: Mon Oct 17 22:27:49 2005
New Revision: 326034

URL: http://svn.apache.org/viewcvs?rev=326034&view=rev
Log:
Client side is working for a single message.
Sandesha recommends the users to use call.invokeNonBlocking with 
useSeperateTransport=true for in-out calls.

If they use invokeBlocking it may time out quickly (defined by 
InOutMepClient.timeOutInMilliSeconds). Usually RM interactions take some time.

Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Oct 
17 22:27:49 2005
@@ -184,7 +184,7 @@
 

                        String NOT_IN_ORDER = "NotInOrder";

 

-                       String DEFAULT_DELIVERY_ASSURANCE = IN_ORDER;

+                       String DEFAULT_DELIVERY_ASSURANCE = NOT_IN_ORDER;

                }

                

                public interface InvocationType {


Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sender.java Mon Oct 17 
22:27:49 2005
@@ -36,6 +36,9 @@
 import org.apache.axis2.context.OperationContext;

 import org.apache.axis2.description.OperationDescription;

 import org.apache.axis2.engine.AxisEngine;

+import org.apache.axis2.i18n.Messages;

+import org.apache.axis2.soap.SOAPEnvelope;

+import org.apache.axis2.transport.TransportUtils;

 import org.apache.axis2.wsdl.WSDLConstants;

 import org.apache.sandesha2.client.SandeshaMepClient;

 import org.apache.sandesha2.msgreceivers.RMMessageReceiver;

@@ -82,57 +85,13 @@
                                MessageContext msgCtx = SandeshaUtil

                                                .getStoredMessageContext(key);

 

-       

                                try {

-                                       RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);

+                                       RMMsgContext rmMsgCtx = MsgInitializer

+                                                       
.initializeMessage(msgCtx);

                                        updateMessage(msgCtx);

 

-                                       if (msgCtx.isServerSide())

-                                               new 
AxisEngine(context).send(msgCtx);

-                                       else {

-                                               

-//                                             
//TwoWayTransportBasedSender.send(msgCtx, msgCtx.getTransportIn());

-//                                             

-//                                             //boolean invokeBlocking = 
isInvocationTypeBlocking (rmMsgCtx);

-//                                             

-//                                             //if 
(msgCtx.getOperationDescription().getMessageExchangePattern()==req-res)

-//                                             //{

-//                                             InOutMEPClient inoutClient = 
new InOutMEPClient (msgCtx.getServiceContext());

-//                                             Call call = new Call ();

-//                                             call.in

-//                                             
inoutClient.setTransportInfo(msgCtx.get);

-//                                             if (invokeBlocking){

-//                                                     
inoutClient.invokeBlocking(msgCtx.getOperationDescription(),msgCtx);

-//                                             }else {

-//                                                     
inoutClient.invokeNonBlocking(msgCtx.getOperationDescription(),msgCtx,new 
SandeshaCallback ());

-//                                             }

-//                                             //}

-                                               

-                                               

-                                               boolean responseExpected = 
isResponseExpected (rmMsgCtx);

-                                               

-                                               if (responseExpected){

-                                                       //Call inOutMepClient = 
new Call (msgCtx.getServiceContext());

-                                                       
//inOutMepClient.setTo(msgCtx.getTo());

-                                                       

-                                                       //this will start the 
listner.

-                               

-                                                       SandeshaMepClient 
inOutMepClient = new SandeshaMepClient (msgCtx.getServiceContext());

-                                                       
//inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,true);

-                                                       
inOutMepClient.setTo(msgCtx.getTo());

-                                                       
inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,true);

-                                                       
inOutMepClient.invokeDual(msgCtx.getOperationDescription(),msgCtx);

-                                                       
//inOutMepClient.setTransportInfo(org.apache.axis2.Constants.TRANSPORT_HTTP,org.apache.axis2.Constants.TRANSPORT_HTTP,false);

-                                                       
//call.invokeBlocking(msgCtx.getOperationDescription(),msgCtx);

-                                               }else {

-                                                       MessageSender sender = 
new MessageSender ();

-                                                       
sender.setTo(msgCtx.getTo());

-                                                       
sender.send(msgCtx.getOperationDescription(),msgCtx);

-                                               }

-

-                                               

-                                               

-                                       }

+                                       new AxisEngine(context).send(msgCtx);

+                                       checkForSyncResponses(msgCtx);

 

                                } catch (AxisFault e1) {

                                        e1.printStackTrace();

@@ -163,22 +122,24 @@
 

        }

 

-       private boolean isResponseExpected (RMMsgContext rmMsgCtx) {

+       private boolean isResponseExpected(RMMsgContext rmMsgCtx) {

                boolean responseExpected = false;

-               

-               if 
(rmMsgCtx.getMessageType()==Constants.MessageTypes.CREATE_SEQ){

+

+               if (rmMsgCtx.getMessageType() == 
Constants.MessageTypes.CREATE_SEQ) {

                        responseExpected = true;

-               }if 
(rmMsgCtx.getMessageType()==Constants.MessageTypes.APPLICATION) {

+               }

+               if (rmMsgCtx.getMessageType() == 
Constants.MessageTypes.APPLICATION) {

                        //a ack may arrive. (not a application response)

-                       if 
(rmMsgCtx.getMessageContext().getOperationDescription().getMessageExchangePattern().equals(

-                                       
org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {

-                                       responseExpected = true;

+                       if 
(rmMsgCtx.getMessageContext().getOperationDescription()

+                                       .getMessageExchangePattern().equals(

+                                                       
org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {

+                               responseExpected = true;

                        }

                }

-               

+

                return true;

        }

-       

+

        public void start(ConfigurationContext context) {

                senderStarted = true;

                this.context = context;

@@ -189,14 +150,50 @@
                try {

                        RMMsgContext rmMsgCtx1 = 
MsgInitializer.initializeMessage(msgCtx1);

                        rmMsgCtx1.addSOAPEnvelope();

-                       

-               

-                       

+

                } catch (AxisFault e) {

                        throw new SandeshaException("Exception in updating 
contexts");

                }

-               

-               

+

+       }

+

+       private void checkForSyncResponses(MessageContext msgCtx) throws 
AxisFault {

+

+               boolean responsePresent = (msgCtx

+                               .getProperty(MessageContext.TRANSPORT_IN) != 
null);

+

+               if (responsePresent) {

+                       //create the response

+                       MessageContext response = new MessageContext(msgCtx

+                                       .getSystemContext(), 
msgCtx.getSessionContext(), msgCtx

+                                       .getTransportIn(), 
msgCtx.getTransportOut());

+                       response.setProperty(MessageContext.TRANSPORT_IN, msgCtx

+                                       
.getProperty(MessageContext.TRANSPORT_IN));

+

+                       response.setServerSide(false);

+

+                       //If request is REST we assume the response is REST, so 
set the

+                       // variable

+                       response.setDoingREST(msgCtx.isDoingREST());

+                       
response.setServiceGroupContextId(msgCtx.getServiceGroupContextId());

+                       
response.setServiceGroupContext(msgCtx.getServiceGroupContext());

+                       response.setServiceContext(msgCtx.getServiceContext());

+                       
response.setServiceDescription(msgCtx.getServiceDescription());

+                       
response.setServiceGroupDescription(msgCtx.getServiceGroupDescription());

+                       

+                       //Changed following from TransportUtils to SandeshaUtil 
since op. context is anavailable.

+                       SOAPEnvelope resenvelope = 
SandeshaUtil.createSOAPMessage(

+                                       response, 
msgCtx.getEnvelope().getNamespace().getName());

+

+                       if (resenvelope != null) {

+                               AxisEngine engine = new 
AxisEngine(msgCtx.getSystemContext());

+                               response.setEnvelope(resenvelope);

+                               engine.receive(response);

+                       } else {

+                               throw new AxisFault(Messages

+                                               
.getMessage("blockInvocationExpectsRes="));

+                       }

+               }

        }

 

 }

Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/client/TwoWayOptionalTransportBasedSender.java
 Mon Oct 17 22:27:49 2005
@@ -47,6 +47,7 @@
                        

                        OperationContext newOperationContext = new 
OperationContext (msgctx.getOperationDescription());

                        

+                       newOperationContext.setProperty("test","test123");

                        
msgctx.getOperationDescription().registerOperationContext(response,

                                        newOperationContext);

                        response.setServerSide(false);


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 Mon Oct 17 22:27:49 2005
@@ -59,6 +59,8 @@
                } catch (SandeshaException ex) {

                        throw new AxisFault("Cant initialize the message");

                }

+               

+               System.out.println("Got message of type:" + 
rmMsgCtx.getMessageType() + " MessageId:" + msgCtx.getMessageID());

 

 //             try {

 //                     System.out.println("SandeshaInHandler Got a message of 
type:" + rmMsgCtx.getMessageType());


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
 Mon Oct 17 22:27:49 2005
@@ -51,6 +51,20 @@
 

 public class SandeshaOutHandler extends AbstractHandler {

 

+       public static final Object key = new Object();

+

+       public static void waitOnKey() throws InterruptedException {

+               synchronized (key) {

+                       key.wait();

+               }

+       }

+

+       public static void notifyAllWaitingOnKey() {

+               synchronized (key) {

+                       key.notifyAll();

+               }

+       }

+

        public void invoke(MessageContext msgCtx) throws AxisFault {

                System.out.println("Sandesha out handler called");

 

@@ -59,8 +73,8 @@
                if (null != DONE && "true".equals(DONE))

                        return;

 

-               
msgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,"true");

-               

+               msgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE, 
"true");

+

                //getting rm message

                RMMsgContext rmMsgCtx = null;

                try {

@@ -205,14 +219,17 @@
 

                                //valid response

 

-                               

                                //Changing message Id.

                                //TODO remove this when Axis2 start sending 
uuids as uuid:xxxx

-                               String messageId = SandeshaUtil.getUUID();

-                               rmMsgCtx.setMessageId(messageId);

-                               OperationContext opCtx = 
msgCtx.getOperationContext();

-                               
msgCtx.getSystemContext().registerOperationContext(messageId, opCtx);

-                               

+                               String messageId1 = SandeshaUtil.getUUID();

+                               if (rmMsgCtx.getMessageId()==null) {

+                                       rmMsgCtx.setMessageId(messageId1);

+                                       System.out.println("Message id was 
null");

+                               }

+                               //OperationContext opCtx = 
msgCtx.getOperationContext();

+//                             
msgCtx.getSystemContext().registerOperationContext(messageId,

+//                                             opCtx);

+

                                if (serverSide) {

 

                                        //FIXME - do not copy application 
messages. Coz u loose

@@ -242,8 +259,9 @@
                                        
newMsgCtx.setOperationContext(newOpContext);

 

                                        //Thid does not have to be processed 
again by RMHandlers

-                                       
newMsgCtx.setProperty(Constants.APPLICATION_PROCESSING_DONE,"true");

-                                       

+                                       newMsgCtx.setProperty(

+                                                       
Constants.APPLICATION_PROCESSING_DONE, "true");

+

                                        //processing the response

                                        processResponseMessage(newRMMsgCtx, 
tempSequenceId,

                                                        messageNumber);

@@ -267,8 +285,9 @@
                                } else {

 

                                        //setting reply to FIXME

-                                       //msgCtx.setReplyTo(new 
EndpointReference ("http://localhost:9070/somethingWorking";));

-                                       

+                                       //msgCtx.setReplyTo(new 
EndpointReference

+                                       // 
("http://localhost:9070/somethingWorking";));

+

                                        //Setting WSA Action if null

                                        //TODO: Recheck weather this action is 
correct

                                        if (msgCtx.getWSAAction() == null) {

@@ -282,55 +301,62 @@
                                                                
.getAxisOperation().getName().getLocalPart();

                                                msgCtx.setWSAAction(to + "/" + 
operationName);

                                        }

-                                       

+

                                        //processing the response

                                        processResponseMessage(rmMsgCtx, 
tempSequenceId,

                                                        messageNumber);

                                        

-                                       //Getting the mep.

-                                       String mep = 
msgCtx.getOperationDescription()

-                                                       
.getMessageExchangePattern();

-

-                                       if 
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {

-                                               //Add a sequence property to 
check weather the response has arrived.

-                                               SequencePropertyBean 
checkResponseBean = new SequencePropertyBean ();

-                                               
checkResponseBean.setSequenceId(msgCtx.getMessageID());

-                                               
checkResponseBean.setName(Constants.SequenceProperties.CHECK_RESPONSE);

-                                               
checkResponseBean.setValue("false");

-                                               
seqPropMgr.insert(checkResponseBean);

-                                       }

-                                       

-                                       //client side wait

-                                       boolean letGo = false;

-                                       while (!letGo) {

-                                               if 
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {

-                                                       //if the mep is in-out 
them wait till the response comes. then pause.

-                                                   SequencePropertyBean 
checkResponseBean = 
seqPropMgr.retrieve(msgCtx.getMessageID(),Constants.SequenceProperties.CHECK_RESPONSE);

-                                                   String val = (String) 
checkResponseBean.getValue();

-                                                   if 
("true".equals(checkResponseBean.getValue())) {

-                                                       
msgCtx.setPausedTrue(getName());

-                                                       letGo = true;

-                                                   }

-                                               } else {

-                                                       //FIXME - non-inout 
case.

-                                                       //if not in-out simply 
pause after the

-                                                       SequencePropertyBean 
outSequenceBean = seqPropMgr

-                                                                       
.retrieve(

-                                                                               
        tempSequenceId,

-                                                                               
        Constants.SequenceProperties.OUT_SEQUENCE_ID);

-                                                       if (outSequenceBean == 
null) {

-                                                               try {

-                                                                       
//Thread.sleep(Constants.CLIENT_SLEEP_TIME);

-                                                                       wait();

-                                                               } catch 
(InterruptedException e1) {

-                                                                       
System.out

-                                                                               
        .println("Client was interupted...");

-                                                               }

-                                                       } else {

-                                                               letGo = true;

-                                                       }

-                                               }

-                                       }

+                                       //pausing the message

+                                       msgCtx.setPausedTrue(getName());

+

+//                                     //Getting the mep.

+//                                     String mep = 
msgCtx.getOperationDescription()

+//                                                     
.getMessageExchangePattern();

+//

+//                                     if 
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {

+//                                             //Add a sequence property to 
check weather the response

+//                                             // has arrived.

+//                                             SequencePropertyBean 
checkResponseBean = new SequencePropertyBean();

+//                                             
checkResponseBean.setSequenceId(msgCtx.getMessageID());

+//                                             checkResponseBean

+//                                                             
.setName(Constants.SequenceProperties.CHECK_RESPONSE);

+//                                             
checkResponseBean.setValue(null);

+//                                             
seqPropMgr.insert(checkResponseBean);

+//                                     }

+//

+//                                     //client side wait

+//                                     boolean letGo = false;

+//                                     while (!letGo) {

+//                                             if 
(WSDLConstants.MEP_URI_IN_OUT.equals(mep)) {

+//                                                     //if the mep is in-out 
them wait till the response

+//                                                     // comes. then pause.

+//                                                     SequencePropertyBean 
checkResponseBean = seqPropMgr

+//                                                                     
.retrieve(

+//                                                                             
        msgCtx.getMessageID(),

+//                                                                             
        Constants.SequenceProperties.CHECK_RESPONSE);

+//                                                     MessageContext response 
= (MessageContext) checkResponseBean.getValue();

+//                                                     if 
(null!=checkResponseBean.getValue()) {

+//                                                             //simply return 
to the caller.

+//                                                             
//msgCtx.setConfigurationContext( properteies)

+//                                                             
//msgCtx.setTransportIn(response.getTransportIn());

+//                                                             
msgCtx.setProperty(MessageContext.TRANSPORT_IN,response.getProperty(MessageContext.TRANSPORT_IN));

+//                                                             
//msgCtx.setProperty(org.apache.axis2.Constants.tra)

+//                                                             
msgCtx.setPausedTrue(getName());

+//                                                             letGo = true;

+//                                                             continue;

+//                                                     }

+//                                             } else {

+//                                                     //FIXME - non-inout 
case.

+//                                                     //TODO check for the 
ack and pause.

+//                                             }

+//                                                     try {

+//                                                             waitOnKey();

+//                                                             

+//                                                     } catch 
(InterruptedException e1) {

+//                                                             
System.out.println("Client was interupted...");

+//                                                     }

+//

+//                                     }

 

                                }

                        }

@@ -349,11 +375,11 @@
                RMMsgContext createSeqRMMessage = 
RMMsgCreator.createCreateSeqMsg(

                                applicationRMMsg, tempSequenceId);

                MessageContext createSeqMsg = 
createSeqRMMessage.getMessageContext();

-               

-               

+

                //TODO remove below

-               //createSeqMsg.setReplyTo(new EndpointReference 
("http://localhost:9070/somethingWorking";));

-               

+               //createSeqMsg.setReplyTo(new EndpointReference

+               // ("http://localhost:9070/somethingWorking";));

+

                createSeqMsg.setRelatesTo(null); //create seq msg does not 
relateTo

                // anything

                AbstractContext context = applicationRMMsg.getContext();

@@ -387,13 +413,6 @@
 

                MessageContext msg = rmMsg.getMessageContext();

 

-//             //Changing message Id.

-//             //TODO remove this when Axis2 start sending uuids as uuid:xxxx

-//             String messageId = SandeshaUtil.getUUID();

-//             rmMsg.setMessageId(messageId);

-//             OperationContext opCtx = msg.getOperationContext();

-//             msg.getSystemContext().registerOperationContext(messageId, 
opCtx);

-

                if (rmMsg == null)

                        throw new SandeshaException("Message or reques message 
is null");

 

@@ -480,8 +499,10 @@
 

                } else {

                        //client side

-                       Object obj = msg.getProperty(Constants.LAST_MESSAGE);

-                       //if (obj != null && "true".equals(obj)) {

+

+                       Object obj = msg.getSystemContext().getProperty(

+                                       Constants.LAST_MESSAGE);

+                       if (obj != null && "true".equals(obj)) {

                                sequence.setLastMessage(new LastMessage());

                                //saving the last message no.

                                SequencePropertyBean lastOutMsgBean = new 
SequencePropertyBean(

@@ -489,7 +510,7 @@
                                                
Constants.SequenceProperties.LAST_OUT_MESSAGE,

                                                new Long(messageNumber));

                                sequencePropertyMgr.insert(lastOutMsgBean);

-                       //}

+                       }

                }

 

                //setting the Sequnece id.

@@ -514,26 +535,26 @@
                        throw new SandeshaException(e1.getMessage());

                }

 

-//             //send the message through sender only in the server case.

-//             //in the client case use the normal flow.

-//             if (msg.isServerSide()) {

-                       //Retransmitter bean entry for the application message

-                       RetransmitterBean appMsgEntry = new RetransmitterBean();

-                       String key = SandeshaUtil.storeMessageContext(rmMsg

-                                       .getMessageContext());

-                       appMsgEntry.setKey(key);

-                       appMsgEntry.setLastSentTime(0);

-                       appMsgEntry.setMessageId(rmMsg.getMessageId());

-                       appMsgEntry.setMessageNumber(messageNumber);

-                       if (outSequenceBean == null || 
outSequenceBean.getValue() == null) {

-                               appMsgEntry.setSend(false);

-                       } else {

-                               appMsgEntry.setSend(true);

+               //              //send the message through sender only in the 
server case.

+               //              //in the client case use the normal flow.

+               //              if (msg.isServerSide()) {

+               //Retransmitter bean entry for the application message

+               RetransmitterBean appMsgEntry = new RetransmitterBean();

+               String key = SandeshaUtil

+                               .storeMessageContext(rmMsg.getMessageContext());

+               appMsgEntry.setKey(key);

+               appMsgEntry.setLastSentTime(0);

+               appMsgEntry.setMessageId(rmMsg.getMessageId());

+               appMsgEntry.setMessageNumber(messageNumber);

+               if (outSequenceBean == null || outSequenceBean.getValue() == 
null) {

+                       appMsgEntry.setSend(false);

+               } else {

+                       appMsgEntry.setSend(true);

 

-                       }

-                       appMsgEntry.setTempSequenceId(tempSequenceId);

-                       retransmitterMgr.insert(appMsgEntry);

-//             }

+               }

+               appMsgEntry.setTempSequenceId(tempSequenceId);

+               retransmitterMgr.insert(appMsgEntry);

+               //              }

        }

 

        private long getNextMsgNo(ConfigurationContext context,


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Mon Oct 17 22:27:49 2005
@@ -21,6 +21,7 @@
 import java.util.Iterator;

 import java.util.List;

 

+import javax.xml.namespace.QName;

 import javax.xml.stream.FactoryConfigurationError;

 import javax.xml.stream.XMLOutputFactory;

 import javax.xml.stream.XMLStreamException;

@@ -35,6 +36,7 @@
 import org.apache.sandesha2.RMMsgContext;

 import org.apache.sandesha2.RMMsgCreator;

 import org.apache.sandesha2.SandeshaException;

+import org.apache.sandesha2.handlers.SandeshaOutHandler;

 import org.apache.sandesha2.storage.AbstractBeanMgrFactory;

 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;

 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;

@@ -48,6 +50,11 @@
 

 public class AcknowledgementProcessor implements MsgProcessor {

 

+       public static void notifyAllWaitingOnKey () {

+       

+               SandeshaOutHandler.key.notifyAll();

+       }

+       

        public void processMessage(RMMsgContext rmMsgCtx) throws 
SandeshaException {

                

                SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) 
rmMsgCtx

@@ -61,6 +68,7 @@
 

                Iterator ackRangeIterator = 
sequenceAck.getAcknowledgementRanges()

                                .iterator();

+               

                Iterator nackIterator = sequenceAck.getNackList().iterator();

                String outSequenceId = 
sequenceAck.getIdentifier().getIdentifier();

                if (outSequenceId == null || "".equals(outSequenceId))

@@ -71,8 +79,6 @@
                SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory

                                
.getInstance(context).getSequencePropretyBeanMgr();

 

-

-

                SequencePropertyBean tempSequenceBean = seqPropMgr.retrieve(

                                outSequenceId,

                                Constants.SequenceProperties.TEMP_SEQUENCE_ID);

@@ -140,6 +146,9 @@
                }

 

                int i = 1;

+               

+               //stopping the progress of the message further.

+               rmMsgCtx.getMessageContext().setPausedTrue(new QName 
(Constants.IN_HANDLER_NAME));

        }

 

        private RetransmitterBean getRetransmitterEntry(Collection collection,


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Mon Oct 17 22:27:49 2005
@@ -41,6 +41,7 @@
 import org.apache.sandesha2.RMMsgCreator;

 import org.apache.sandesha2.SOAPAbstractFactory;

 import org.apache.sandesha2.SandeshaException;

+import org.apache.sandesha2.handlers.SandeshaOutHandler;

 import org.apache.sandesha2.msgreceivers.RMMessageReceiver;

 import org.apache.sandesha2.storage.AbstractBeanMgrFactory;

 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;

@@ -202,7 +203,9 @@
 

                long nextMsgno = bean.getNextMsgNoToProcess();

 

+               

                //FIXME - fix delivery assurances for the client side

+

                if (msgCtx.isServerSide()) {

                if (Constants.QOS.DeliveryAssurance.DEFAULT_DELIVERY_ASSURANCE 
== Constants.QOS.DeliveryAssurance.IN_ORDER) {

                        //pause the message

@@ -258,13 +261,14 @@
                }

                

                //client side - SET CheckResponse to true.

-               if (!msgCtx.isServerSide()) {

+               //FIXME this will not work. Even in client side inServerSide () 
is true for the messages.

+               //if (!msgCtx.isServerSide()) {

                        try {

                                MessageContext requestMessage = 
rmMsgCtx.getMessageContext().getOperationContext().getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);

                                String requestMessageId = 
requestMessage.getMessageID();

                                SequencePropertyBean checkResponseBean = 
seqPropMgr.retrieve(requestMessageId,Constants.SequenceProperties.CHECK_RESPONSE);

                                if (checkResponseBean!=null) {

-                                       checkResponseBean.setValue("true");

+                                       checkResponseBean.setValue(msgCtx);

                                        seqPropMgr.update(checkResponseBean);

                                }

                        

@@ -272,8 +276,14 @@
                                throw new SandeshaException (e.getMessage());

                        }

                        

+                       

+                       //SET THe RESPONSE

+                       

+                       //WAKE UP THE SLEEPING THREADS

+                       //SandeshaOutHandler.notifyAllWaitingOnKey();

+                       

                        //set 

-               }

+               //}

 

        }

 


Modified: 
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=326034&r1=326033&r2=326034&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java 
Mon Oct 17 22:27:49 2005
@@ -17,6 +17,7 @@
 package org.apache.sandesha2.util;

 

 import java.awt.datatransfer.StringSelection;

+import java.io.InputStream;

 import java.util.ArrayList;

 import java.util.Collection;

 import java.util.HashMap;

@@ -24,20 +25,31 @@
 import java.util.Iterator;

 import java.util.StringTokenizer;

 

+import javax.xml.stream.XMLInputFactory;

+import javax.xml.stream.XMLStreamReader;

+

 import org.apache.axis2.AxisFault;

 import org.apache.axis2.addressing.AddressingConstants;

 import org.apache.axis2.addressing.MessageInformationHeaders;

 import org.apache.axis2.addressing.miheaders.RelatesTo;

 import org.apache.axis2.context.ConfigurationContext;

 import org.apache.axis2.context.MessageContext;

+import org.apache.axis2.context.OperationContext;

 

 import org.apache.axis2.description.TransportInDescription;

 import org.apache.axis2.description.TransportOutDescription;

 import org.apache.axis2.engine.AxisEngine;

+import org.apache.axis2.i18n.Messages;

 import org.apache.axis2.om.OMElement;

 import org.apache.axis2.om.impl.MIMEOutputUtils;

+import org.apache.axis2.om.impl.llom.builder.StAXBuilder;

+import org.apache.axis2.om.impl.llom.builder.StAXOMBuilder;

 import org.apache.axis2.soap.SOAPEnvelope;

+import org.apache.axis2.soap.SOAPFactory;

+import org.apache.axis2.soap.impl.llom.builder.StAXSOAPModelBuilder;

+import org.apache.axis2.soap.impl.llom.soap11.SOAP11Factory;

 import org.apache.axis2.transport.http.HTTPConstants;

+import org.apache.axis2.transport.http.HTTPTransportUtils;

 import org.apache.axis2.util.UUIDGenerator;

 import org.apache.commons.httpclient.NameValuePair;

 import org.apache.sandesha2.Constants;

@@ -328,7 +340,57 @@
                return false;

        }

        

-//     public SOAPEnvelope cloneSOAPEnvelope (SOAPEnvelope oldEnvelope) {

-//             

-//     }

+       public static SOAPEnvelope createSOAPMessage (MessageContext 
msgContext, String soapNamespaceURI) throws AxisFault {

+        try {

+               

+            InputStream inStream = (InputStream) msgContext.getProperty(

+                    MessageContext.TRANSPORT_IN);

+            msgContext.setProperty(MessageContext.TRANSPORT_IN, null);

+            //this inputstram is set by the TransportSender represents a two 
way transport or

+            //by a Transport Recevier

+            if (inStream == null) {

+                throw new AxisFault(Messages.getMessage("inputstreamNull"));

+            }

+            

+            //This should be set later

+            //TODO check weather this affects MTOM

+            String contentType = null;

+

+            StAXBuilder builder = null;

+            SOAPEnvelope envelope = null;

+

+            String charSetEnc = 
(String)msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING);

+            if(charSetEnc == null) {

+               charSetEnc = MessageContext.DEFAULT_CHAR_SET_ENCODING;

+            }

+            

+                       if (contentType != null) {

+                msgContext.setDoingMTOM(true);

+                builder =

+                        HTTPTransportUtils.selectBuilderForMIME(msgContext,

+                                inStream,

+                                (String) contentType);

+                envelope = (SOAPEnvelope) builder.getDocumentElement();

+            } else if (msgContext.isDoingREST()) {

+                XMLStreamReader xmlreader =

+                        XMLInputFactory.newInstance().createXMLStreamReader(

+                                inStream,charSetEnc);

+                SOAPFactory soapFactory = new SOAP11Factory();

+                builder = new StAXOMBuilder(xmlreader);

+                builder.setOmbuilderFactory(soapFactory);

+                envelope = soapFactory.getDefaultEnvelope();

+                envelope.getBody().addChild(builder.getDocumentElement());

+            } else {

+                XMLStreamReader xmlreader =

+                        XMLInputFactory.newInstance().createXMLStreamReader(

+                                       inStream,charSetEnc);

+                builder = new StAXSOAPModelBuilder(xmlreader, 
soapNamespaceURI);

+                envelope = (SOAPEnvelope) builder.getDocumentElement();

+            }

+            return envelope;

+        } catch (Exception e) {

+            throw new AxisFault(e);

+        }

+

+       }

 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to