Author: gatfora
Date: Fri Jan 12 02:44:48 2007
New Revision: 495541

URL: http://svn.apache.org/viewvc?view=rev&rev=495541
Log:
Wake sender/invoker threads after transaction commits

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/MessageRetransmissionTest.java
    
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 Fri Jan 12 02:44:48 2007
@@ -179,39 +179,37 @@
 
                        if (propertyKey != null && msgNo > 0) {
                                RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, propertyKey);
-                               if (rmdBean.getServerCompletedMessages() != 
null) {
-                                       if 
(rmdBean.getServerCompletedMessages().contains(new Long(msgNo)))
-                                               drop = true;
-                               }
-
-                               if (!drop) {
-                                       // Checking for RM specific EMPTY_BODY 
LASTMESSAGE.
-                                       SOAPBody body = 
rmMsgContext.getSOAPEnvelope().getBody();
-                                       boolean emptyBody = false;
-                                       if (body.getChildElements().hasNext() 
== false) {
-                                               emptyBody = true;
-                                       }
-
-                                       if (emptyBody) {
-                                               if (sequence.getLastMessage() 
!= null) {
-                                                       
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.emptyLastMsg));
+                               if (rmdBean != null) {
+                                       if 
(rmdBean.getServerCompletedMessages() != null) {
+                                               if 
(rmdBean.getServerCompletedMessages().contains(new Long(msgNo)))
                                                        drop = true;
-
-                                                       if 
(rmdBean.getServerCompletedMessages() == null) {
-                                                               
rmdBean.setServerCompletedMessages(new ArrayList());
+                                       }
+       
+                                       if (!drop) {
+                                               // Checking for RM specific 
EMPTY_BODY LASTMESSAGE.
+                                               SOAPBody body = 
rmMsgContext.getSOAPEnvelope().getBody();
+                                               boolean emptyBody = false;
+                                               if 
(body.getChildElements().hasNext() == false) {
+                                                       emptyBody = true;
+                                               }
+       
+                                               if (emptyBody) {
+                                                       if 
(sequence.getLastMessage() != null) {
+                                                               
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.emptyLastMsg));
+                                                               drop = true;
+       
+                                                               List 
serverCompletedMsgs = rmdBean.getServerCompletedMessages();
+                                                               
+                                                               // Add this 
message to the completed range
+                                                               
serverCompletedMsgs.add(new Long(msgNo));
+                                                               
+                                                               
rmdBean.setServerCompletedMessages(serverCompletedMsgs);
+       
+                                                               // TODO correct 
the syntac into '[received msgs]'
+       
+                                                               // Update the 
rmdBean
+                                                               
storageManager.getRMDBeanMgr().update(rmdBean);
                                                        }
-
-                                                       List 
serverCompletedMsgs = rmdBean.getServerCompletedMessages();
-                                                       
-                                                       // Add this message to 
the completed range
-                                                       
serverCompletedMsgs.add(Long.toString(msgNo));
-                                                       
-                                                       
rmdBean.setServerCompletedMessages(serverCompletedMsgs);
-
-                                                       // TODO correct the 
syntac into '[received msgs]'
-
-                                                       // Update the rmdBean
-                                                       
storageManager.getRMDBeanMgr().update(rmdBean);
                                                }
                                        }
                                }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Fri Jan 12 02:44:48 2007
@@ -146,6 +146,7 @@
                Collection retransmitterEntriesOfSequence = 
retransmitterMgr.find(input);
 
                ArrayList ackedMessagesList = new ArrayList();
+               boolean removedSenderBean = false;
                while (ackRangeIterator.hasNext()) {
                        AcknowledgementRange ackRange = (AcknowledgementRange) 
ackRangeIterator.next();
                        long lower = ackRange.getLowerValue();
@@ -160,6 +161,7 @@
                                        // removing the application message 
from the storage.
                                        String storageKey = 
retransmitterBean.getMessageContextRefKey();
                                        
storageManager.removeMessageContext(storageKey);
+                                       removedSenderBean = true;
                                }
 
                                ackedMessagesList.add(new Long(messageNo));
@@ -213,9 +215,11 @@
                // setting the completed_messages list. This gives all the 
messages of
                // the sequence that were acked.
                RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
-
-               // Set the completed message list
-               rmsBean.setClientCompletedMessages(ackedMessagesList);
+               
+               // Set the completed message list, but only if we have actually 
removed a SenderBean
+               // It is possible for the ACK messages arrive out of sequence
+               if (removedSenderBean)
+                 rmsBean.setClientCompletedMessages(ackedMessagesList);
                
                long highestOutMsgNo = rmsBean.getLastOutMessage();
                

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryBeanMgr.java
 Fri Jan 12 02:44:48 2007
@@ -18,7 +18,7 @@
 
        private static final Log log = LogFactory.getLog(InMemoryBeanMgr.class);
        private Hashtable table;
-       private InMemoryStorageManager mgr;
+       protected InMemoryStorageManager mgr;
 
        protected InMemoryBeanMgr(InMemoryStorageManager mgr, AbstractContext 
context, String key) {
                this.mgr = mgr;

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 Fri Jan 12 02:44:48 2007
@@ -34,7 +34,9 @@
        }
 
        public boolean insert(InvokerBean bean) throws SandeshaStorageException 
{
-               return super.insert(bean.getMessageContextRefKey(), bean);
+               boolean result = super.insert(bean.getMessageContextRefKey(), 
bean);
+               mgr.getInMemoryTransaction().setReceivedMessages(true);
+               return result;
        }
 
        public boolean delete(String key) throws SandeshaStorageException {
@@ -72,7 +74,9 @@
        }
 
        public boolean update(InvokerBean bean) throws SandeshaStorageException 
{
-               return super.update(bean.getMessageContextRefKey(), bean);
+               boolean result = super.update(bean.getMessageContextRefKey(), 
bean);
+               mgr.getInMemoryTransaction().setReceivedMessages(true);
+               return result;
        }
        
        public InvokerBean findUnique(InvokerBean bean) throws 
SandeshaException {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 Fri Jan 12 02:44:48 2007
@@ -52,7 +52,9 @@
                if (bean.getMessageID() == null)
                        throw new 
SandeshaStorageException(SandeshaMessageHelper.getMessage(
                                        SandeshaMessageKeys.nullMsgId));
-               return super.insert(bean.getMessageID(), bean);
+               boolean result = super.insert(bean.getMessageID(), bean);
+               mgr.getInMemoryTransaction().setSentMessages(true);
+               return result;
        }
 
        public List find(String internalSequenceID) throws 
SandeshaStorageException {
@@ -164,7 +166,9 @@
        }
        
        public boolean update(SenderBean bean) throws SandeshaStorageException {
-               return super.update(bean.getMessageID(), bean);
+               boolean result = super.update(bean.getMessageID(), bean);
+               mgr.getInMemoryTransaction().setSentMessages(true);
+               return result;
        }
        
        public SenderBean findUnique(SenderBean bean) throws SandeshaException {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Fri Jan 12 02:44:48 2007
@@ -97,7 +97,16 @@
                }
                return result;
        }
-       
+
+       InMemoryTransaction getInMemoryTransaction() {
+               InMemoryTransaction result = null;
+               synchronized (transactions) {
+                       Thread key = Thread.currentThread();
+                       result = (InMemoryTransaction) transactions.get(key);
+               }
+               return result;
+       }
+
        void removeTransaction(Transaction t) {
                synchronized (transactions) {
                        Collection entries = transactions.values();
@@ -278,6 +287,8 @@
        }
 
 }
+
+
 
 
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Fri Jan 12 02:44:48 2007
@@ -43,6 +43,8 @@
        private int    threadId;
        private ArrayList enlistedBeans = new ArrayList();
        private InMemoryTransaction waitingForTran = null;
+       private boolean sentMessages = false;
+       private boolean receivedMessages = false;
        
        InMemoryTransaction(InMemoryStorageManager manager, String threadName, 
int id) {
                if(log.isDebugEnabled()) log.debug("Entry: 
InMemoryTransaction::<init>");
@@ -54,6 +56,8 @@
        
        public void commit() {
                releaseLocks();
+               if(sentMessages) manager.getSender().wakeThread();
+               if(receivedMessages) manager.getInvoker().wakeThread();
        }
 
        public void rollback() {
@@ -110,6 +114,7 @@
                                }
                        }
                }
+               
                if(log.isDebugEnabled()) log.debug("Exit: 
InMemoryTransaction::enlist");
        }
        
@@ -141,5 +146,15 @@
                result.append("]");
                return result.toString();
        }
+
+       public void setReceivedMessages(boolean receivedMessages) {
+               this.receivedMessages = receivedMessages;
+       }
+
+       public void setSentMessages(boolean sentMessages) {
+               this.sentMessages = sentMessages;
+       }
 }
+
+
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Fri Jan 12 02:44:48 2007
@@ -221,16 +221,21 @@
                // If this invoker is working for several sequences, we use 
round-robin to
                // try and give them all a chance to invoke messages.
                int nextIndex = 0;
+               boolean sleep = false;
+               boolean processedMessage = false;
 
                while (isThreadStarted()) {
 
                        try {
-                               
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
+                               if(sleep && !runMainLoop()) 
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
+                               // Indicate that we are running the main loop
+                               setRanMainLoop();
                        } catch (InterruptedException ex) {
-                               log.debug("Invoker was Inturrepted....");
-                               log.debug(ex.getMessage());
+                               log.debug("Invoker was Interrupted.", ex);
+                       } finally {
+                               sleep = false;
                        }
-                                       
+
                        //pause if we have to
                        doPauseIfNeeded();
 
@@ -259,6 +264,7 @@
                                if (allSequencesBean == null) {
                                        if (log.isDebugEnabled())
                                                log.debug("AllSequencesBean not 
found");
+                                       sleep = true;
                                        continue;
                                }
                                
@@ -269,7 +275,14 @@
                                log.debug("Choosing one from " + size + " 
sequences");
                                if(nextIndex >= size) {
                                        nextIndex = 0;
-                                       if (size == 0) continue;
+
+                                       // We just looped over the set of 
sequences. If we didn't process any
+                                       // messages on this loop then we sleep 
before the next one
+                                       if(size == 0 || !processedMessage) {
+                                               sleep = true;
+                                       }
+                                       processedMessage = false;
+                                       continue;
                                }
                                String sequenceId = (String) 
allSequencesList.get(nextIndex++);
                                log.debug("Chose sequence " + sequenceId);
@@ -284,11 +297,15 @@
                                        // cleaning the invalid data of the all 
sequences.
                                        
allSequencesBean.setValue(allSequencesList.toString());
                                        
sequencePropMgr.update(allSequencesBean);
+                                       if (allSequencesList.size() == 0)
+                                               sleep = true;
                                        continue;
                                }
 
                                long nextMsgno = 
nextMsgBean.getNextMsgNoToProcess();
                                if (nextMsgno <= 0) {
+                                       // Make sure we sleep on the next loop, 
so that we don't spin in a tight loop
+                                       sleep = true;
                                        if (log.isDebugEnabled())
                                                log.debug("Invalid Next Message 
Number " + nextMsgno);
                                        String message = 
SandeshaMessageHelper.getMessage(
@@ -300,16 +317,20 @@
                                List invokerBeans = storageMapMgr.find(
                                                new InvokerBean(null, 
nextMsgno, sequenceId));
                                
+                               // If there aren't any beans to process then 
move on to the next sequence
+                               if (invokerBeans.size() == 0) {
+                                       continue;
+                               }
+                               
                                //add any msgs that belong to out of order 
windows
                                addOutOfOrderInvokerBeansToList(sequenceId, 
                                                storageManager, invokerBeans);
                                
                                Iterator stMapIt = invokerBeans.iterator();
-                               
+
                                //TODO correct the locking mechanism to have 
one lock per sequence.
                                
                                if (stMapIt.hasNext()) { //some invokation work 
is present
-
                                        InvokerBean bean = (InvokerBean) 
stMapIt.next();
                                        //see if this is an out of order msg
                                        boolean beanIsOutOfOrderMsg = 
bean.getMsgNo()!=nextMsgno;
@@ -322,6 +343,10 @@
                                        if 
(getWorkerLock().isWorkPresent(workId)) {
                                                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, 
workId);
                                                log.debug(message);
+                                               
+                                               // As there is already a worker 
assigned we are probably dispatching
+                                               // messages too quickly, so we 
sleep before trying the next sequence.
+                                               sleep = true;
                                                continue;
                                        }
 
@@ -346,8 +371,9 @@
                                        //adding the workId to the lock after 
assigning it to a thread makes sure 
                                        //that all the workIds in the Lock are 
handled by threads.
                                        getWorkerLock().addWork(workId);
+                                       
+                                       processedMessage = true;
                                }
-
                        } catch (Exception e) {
                                if (transaction != null) {
                                        try {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 Fri Jan 12 02:44:48 2007
@@ -44,7 +44,8 @@
        
        protected transient ThreadFactory threadPool;
        protected ConfigurationContext context = null;
-       
+       private boolean reRunThread;
+
        public SandeshaThread(int sleepTime) {
                this.sleepTime = sleepTime;
        lock = new WorkerLock ();
@@ -172,6 +173,36 @@
                        hasPausedRunning = false;
        }
 
+       /**
+        * Wake the current thread as there is work to be done.
+        * Also flag that if we miss a notify, then there is 
+        * work to be done.  Implementing threads should check this value
+        * before waiting
+        *
+        */
+       public synchronized void wakeThread() {
+               reRunThread = true;
+               
+               if (!hasPausedRunning)
+                       notify();
+       }
+       
+       /**
+        * Indicate that the main loop has been run      
+        */
+       public synchronized void setRanMainLoop() {
+               reRunThread = false;
+       }
+       
+       /**
+        * Test to check if a notify has been called when not waiting
+        * 
+        * @return
+        */
+       protected synchronized boolean runMainLoop () {
+               return reRunThread;
+       }
+       
        /**
         * The main work loop, to be implemented by any child class.
         */

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Fri Jan 12 02:44:48 2007
@@ -47,7 +47,17 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::internalRun");
 
+               if (context == null) {
+                       String message = SandeshaMessageHelper
+                                       
.getMessage(SandeshaMessageKeys.configContextNotSet);
+                       message = SandeshaMessageHelper.getMessage(
+                                       
SandeshaMessageKeys.cannotCointinueSender, message);
+                       log.debug(message);
+                       throw new RuntimeException(message);
+               }
+
                StorageManager storageManager = null;
+               boolean sleep = false;
 
                try {
                        storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
@@ -61,28 +71,25 @@
                while (isThreadStarted()) {
 
                        try {
-                               
Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
+                               synchronized (this) {           
+                                       if(sleep && !runMainLoop()) 
wait(Sandesha2Constants.SENDER_SLEEP_TIME);
+                                       // Indicate that we are running the 
main loop
+                                       setRanMainLoop();
+                               }
                        } catch (InterruptedException e1) {
-                               // e1.printStackTrace();
                                log.debug("Sender was interupted...");
                                log.debug(e1.getMessage());
                                log.debug("End printing Interrupt...");
+                       } finally {
+                               sleep = false;
                        }
-                       
+
                        //pause if we have to
                        doPauseIfNeeded();
 
                        Transaction transaction = null;
 
                        try {
-                               if (context == null) {
-                                       String message = SandeshaMessageHelper
-                                                       
.getMessage(SandeshaMessageKeys.configContextNotSet);
-                                       message = 
SandeshaMessageHelper.getMessage(
-                                                       
SandeshaMessageKeys.cannotCointinueSender, message);
-                                       log.debug(message);
-                                       throw new SandeshaException(message);
-                               }
                                
                                transaction = storageManager.getTransaction();
 
@@ -94,6 +101,9 @@
                                                                
.getMessage(SandeshaMessageKeys.senderBeanNotFound);
                                                log.debug(message);
                                        }
+                                       
+                                       // As there was no work to do, we sleep 
for a while on the next loop.
+                                       sleep = true;
                                        continue;
                                }
 
@@ -111,6 +121,9 @@
                                                                                
workId);
                                                log.debug(message);
                                        }
+                                       // As there is already a worker running 
we are probably looping
+                                       // too fast, so sleep on the next loop.
+                                       sleep = true;
                                        continue;
                                }
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Fri Jan 12 02:44:48 2007
@@ -2,6 +2,8 @@
 
 import java.util.ArrayList;
 
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFault;
 import org.apache.axis2.AxisFault;
@@ -277,7 +279,6 @@
                                        // Commit the transaction to release 
the SenderBean
                                        transaction.commit();
                                        transaction = null;
-                                       transaction = 
storageManager.getTransaction();
                                        checkForSyncResponses(msgCtx);
                                }
                        }
@@ -285,7 +286,7 @@
                        if ((rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ)
                                        &&
                                         
(Sandesha2Constants.SPEC_2005_02.NS_URI.equals(rmMsgCtx.getRMNamespaceValue())))
 {
-                               
+                               transaction = storageManager.getTransaction();
                                //terminate message sent using the 
SandeshaClient. Since the terminate message will simply get the
                                //InFlow of the reference message get called 
which could be zero sized (OutOnly operations).
                                
@@ -367,6 +368,15 @@
                        // ctx.
                        OperationContext requestMsgOpCtx = 
msgCtx.getOperationContext();
                        if (requestMsgOpCtx != null) {
+                               
+                               // If the AxisOperation object doesn't have a 
message receiver, it means that this was
+                               // an out only op where we have added an ACK to 
the response.  Set the requestMsgOpCtx to
+                               // be the RMIn
+                               if 
(requestMsgOpCtx.getAxisOperation().getMessageReceiver() == null) {
+                                       // Generate a new RM In Only operation
+                                       requestMsgOpCtx = new OperationContext( 
msgCtx.getAxisService().getOperation(new QName("RMInOnlyOperation")));          
                        
+                               }
+                               
                                
responseMessageContext.setOperationContext(requestMsgOpCtx);
                                
                                if 
(responseMessageContext.getProperty(HTTPConstants.MTOM_RECEIVED_CONTENT_TYPE) 
== null) {
@@ -402,6 +412,15 @@
                        }
 
                        if (resenvelope != null) {
+                               if (log.isDebugEnabled())
+                               {
+                                       log.debug("RECEIVED " + 
resenvelope.getHeader());
+                                       if (requestMsgOpCtx != null) {
+                                       log.debug("A:" + 
requestMsgOpCtx.getOperationName());
+                                       log.debug("B:" + 
requestMsgOpCtx.getAxisOperation().getName());
+                                       log.debug("C:" + 
requestMsgOpCtx.getAxisOperation().getMessageReceiver());
+                                       }
+                               }
                                responseMessageContext.setEnvelope(resenvelope);
                                AxisEngine engine = new 
AxisEngine(msgCtx.getConfigurationContext());
 

Modified: 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/MessageRetransmissionTest.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/MessageRetransmissionTest.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/MessageRetransmissionTest.java
 (original)
+++ 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/MessageRetransmissionTest.java
 Fri Jan 12 02:44:48 2007
@@ -60,15 +60,13 @@
                //serviceClient.
                
                serviceClient.setOptions(clientOptions);
-               
                serviceClient.fireAndForget(getPingOMBlock("ping1"));
 //             serviceClient.fireAndForget(getPingOMBlock("ping2"));
                
                //starting the server after a wait
-               Thread.sleep(10000);
+               Thread.sleep(3000);
                startServer(server_repoPath, server_axis2_xml);
 
-               clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, 
"true");
                serviceClient.fireAndForget(getPingOMBlock("ping2"));
                
                long limit = System.currentTimeMillis() + waitTime;
@@ -80,7 +78,6 @@
                                SequenceReport sequenceReport = 
SandeshaClient.getOutgoingSequenceReport(serviceClient);
                                
assertTrue(sequenceReport.getCompletedMessages().contains(new Long(1)));
                                
assertTrue(sequenceReport.getCompletedMessages().contains(new Long(2)));
-                               
assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TERMINATED);
                                
assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
 
                                lastError = null;
@@ -90,7 +87,7 @@
                        }
                }
                if(lastError != null) throw lastError;
-
+               
                configContext.getListenerManager().stop();
                serviceClient.cleanup();
        }

Modified: 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java?view=diff&rev=495541&r1=495540&r2=495541
==============================================================================
--- 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
 (original)
+++ 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/SandeshaClientTest.java
 Fri Jan 12 02:44:48 2007
@@ -659,7 +659,7 @@
                ServiceClient serviceClient = new ServiceClient 
(configContext,null);
                
                serviceClient.setOptions(clientOptions);
-               
+       
                serviceClient.fireAndForget(getPingOMBlock("ping1"));
                
                // Let an error occur before we start the server
@@ -687,7 +687,6 @@
                
                startServer(server_repoPath, server_axis2_xml);
 
-               clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, 
"true");
                serviceClient.fireAndForget(getPingOMBlock("ping2"));
                
                
@@ -699,7 +698,6 @@
                                SequenceReport sequenceReport = 
SandeshaClient.getOutgoingSequenceReport(serviceClient);
                                
assertTrue(sequenceReport.getCompletedMessages().contains(new Long(1)));
                                
assertTrue(sequenceReport.getCompletedMessages().contains(new Long(2)));
-                               
assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TERMINATED);
                                
assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
 
                                lastError = null;
@@ -710,6 +708,8 @@
                }
                if(lastError != null) throw lastError;
        
+               SandeshaClient.terminateSequence(serviceClient, sequenceKey);
+               
                configContext.getListenerManager().stop();
                serviceClient.cleanup();
        }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to