Author: gatfora
Date: Tue Nov  6 01:05:28 2007
New Revision: 592342

URL: http://svn.apache.org/viewvc?rev=592342&view=rev
Log:
As described in SANDESHA2-108 this change avoids the Inorder thread switch if 
the message being received is the next message to be invoked.  This change 
requires Axis2 revision 592132

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 Tue Nov  6 01:05:28 2007
@@ -942,16 +942,8 @@
                        StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configContext, 
configContext.getAxisConfiguration());
                        reportTransaction = storageManager.getTransaction();
 
-                       //only do this if we are running inOrder
-                       
if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
-                               Invoker invoker = 
(Invoker)SandeshaUtil.getSandeshaStorageManager(configContext, 
configContext.getAxisConfiguration()).getInvoker();
-                               if (invoker==null){
-                                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
-                                               
SandeshaMessageKeys.invokerNotFound, sequenceID));
-                               }
-                               
-                               
invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, 
allowLaterDeliveryOfMissingMessages);                    
-                       }
+                       // There will only be messages waiting if we are 
running in-order
+                       
Invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID, 
allowLaterDeliveryOfMissingMessages);                    
                        
                        if(reportTransaction != null && 
reportTransaction.isActive()) reportTransaction.commit();
                        reportTransaction = null;

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 Tue Nov  6 01:05:28 2007
@@ -220,9 +220,7 @@
             
                boolean isDuplicate = true;
                //still allow this msg if we have no corresponding invoker bean 
for it and we are inOrder
-               boolean isInOrder = 
-                       
SandeshaUtil.getDefaultPropertyBean(rmMsgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-               if(isInOrder)
+               if(SandeshaUtil.isInOrder(rmMsgCtx.getMessageContext()))
                {
                InvokerBean finderBean = new InvokerBean();
                finderBean.setMsgNo(msgNo);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Tue Nov  6 01:05:28 2007
@@ -210,7 +210,6 @@
        public final static String elementMustForSpec = "elementMustForSpec";
        public final static String couldNotSendCreateSeqResponse = 
"couldNotSendCreateSeqResponse";
        public final static String invalidElementFoundWithinElement = 
"invalidElementFoundWithinElement";
-       public final static String invokerNotFound="invokerNotFound";
            
        public final static String 
couldNotSendCloseResponse="couldNotSendCloseResponse";
        

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Tue Nov  6 01:05:28 2007
@@ -46,7 +46,6 @@
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -59,7 +58,7 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.InvokerWorker;
 import org.apache.sandesha2.wsrm.Sequence;
 
 /**
@@ -135,7 +134,7 @@
                }
                
                // setting acked msg no range
-               ConfigurationContext configCtx = 
rmMsgCtx.getMessageContext().getConfigurationContext();
+               ConfigurationContext configCtx = 
msgCtx.getConfigurationContext();
                if (configCtx == null) {
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
                        log.debug(message);
@@ -202,9 +201,9 @@
                }
                
                String specVersion = rmMsgCtx.getRMSpecVersion();
-               if 
((SandeshaUtil.isDuplicateInOnlyMessage(rmMsgCtx.getMessageContext())
+               if ((SandeshaUtil.isDuplicateInOnlyMessage(msgCtx)
                                                ||
-                                       
SandeshaUtil.isDuplicateInOutMessage(rmMsgCtx.getMessageContext()))
+                                       
SandeshaUtil.isDuplicateInOutMessage(msgCtx))
                                && 
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == 
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
                        
                        // this is a duplicate message and the invocation type 
is EXACTLY_ONCE. We try to return
@@ -353,11 +352,11 @@
                        // If the MEP doesn't need the backchannel, and nor do 
we, we should signal it so that it
                        // can close off as soon as possible.
                        if (backchannelFree) {
+                               TransportUtils.setResponseWritten(msgCtx, 
false);
+
                                RequestResponseTransport t = null;
                                t = (RequestResponseTransport) 
rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-
                                if(t != null && 
RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
-                                       
TransportUtils.setResponseWritten(msgCtx, false);
                                        t.acknowledgeMessage(msgCtx);
                                }
                        }
@@ -377,33 +376,40 @@
                        }
                }
                
-               // If the storage manager has an invoker, then they may be 
implementing inOrder, or
-               // transactional delivery. Either way, if they have one we 
should use it.
-               SandeshaThread invoker = storageManager.getInvoker();
-               if (invoker != null) {
-                       // Whatever the MEP, we stop processing here and the 
invoker will do the real work. We only
-                       // SUSPEND if we need to keep the backchannel open for 
the response... we may as well ABORT
-                       // to let other cases end more quickly.
-                       if(backchannelFree && ackBackChannel) {
-                               result = InvocationResponse.ABORT;
-                       } else {
-                               result = InvocationResponse.SUSPEND;
-                       }
+               // If the storage manager is implementing inOrder, or using 
transactional delivery
+               // then we should hand the message over to the invoker thread. 
If not, we can invoke
+               // it directly ourselves.
+               InvokerWorker worker = null;
+               if (SandeshaUtil.isInOrder(msgCtx) || 
storageManager.hasUserTransaction(msgCtx)) {
                    
-                       InvokerBeanMgr storageMapMgr = 
storageManager.getInvokerBeanMgr();
-
                        InvokerBean invokerBean = new InvokerBean(key, msgNo, 
sequenceId);
-                       
                        ContextManager contextMgr = 
SandeshaUtil.getContextManager(configCtx);
+
                        if(contextMgr != null) 
invokerBean.setContext(contextMgr.storeContext());
 
-                       boolean wasAdded = storageMapMgr.insert(invokerBean);
+                       boolean wasAdded = 
storageManager.getInvokerBeanMgr().insert(invokerBean);
 
                        // This will avoid performing application processing 
more than once.
                        
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
                        
+                       // Whatever the MEP, we stop processing here and the 
invoker will do the real work. As we
+                       // are taking responsibility for the message we need to 
return SUSPEND
+                       result = InvocationResponse.SUSPEND;
+            
                        if (wasAdded) {
-                               storageManager.storeMessageContext(key, 
rmMsgCtx.getMessageContext());        
+                               storageManager.storeMessageContext(key, msgCtx);
+                               // We can invoke the message immediately, if 
this is the next message to invoke,
+                               // and we don't have a user transaction in play.
+                               if(bean.getNextMsgNoToProcess() == msgNo && 
!storageManager.hasUserTransaction(msgCtx)) {
+                                       String workId = sequenceId;
+                                       ConfigurationContext context = 
msgCtx.getConfigurationContext();
+                                       
+                                       worker = new InvokerWorker(context, 
invokerBean);
+                                       worker.setWorkId(workId);
+                                       
+                                       // Actually take the lock
+                                       worker.getLock().addWork(workId, 
worker);
+                               }
                        } else {
                                // Abort this message immediately as this 
message has already been added
                                sendAck = false;
@@ -422,6 +428,14 @@
                if (transaction != null && transaction.isActive()) 
                        transaction.commit();
                
+               if(worker != null) {
+                       try {
+                               worker.run();
+                       } catch(Exception e)  {
+                               log.error("Caught exception running 
InvokerWorker", e);
+                       }
+               }
+
                if (sendAck) {
                        try {
                                transaction = storageManager.getTransaction();
@@ -429,6 +443,15 @@
                                RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, 
storageManager,true);
                                
AcknowledgementManager.sendAckNow(ackRMMsgContext);
                                TransportUtils.setResponseWritten(msgCtx, true);
+                               RequestResponseTransport t = 
+                                       (RequestResponseTransport) 
rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+                               
+                               // Tell the transport that we have finished 
with the message as the response should have been
+                               // written
+                               if(t != null && 
RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
+                                       t.signalResponseReady();
+                               }
+
                                if (transaction != null && 
transaction.isActive()) transaction.commit();
                                transaction = null;
                        

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
 Tue Nov  6 01:05:28 2007
@@ -33,9 +33,7 @@
        }
 
        public boolean insert(InvokerBean bean) throws SandeshaStorageException 
{
-               boolean result = super.insert(bean.getMessageContextRefKey(), 
bean);
-               mgr.getInMemoryTransaction().setReceivedMessages(true);
-               return result;
+               return super.insert(bean.getMessageContextRefKey(), bean);
        }
 
        public boolean delete(String key) throws SandeshaStorageException {
@@ -51,9 +49,7 @@
        }
        
        public boolean update(InvokerBean bean) throws SandeshaStorageException 
{
-               boolean result = super.update(bean.getMessageContextRefKey(), 
bean);
-               mgr.getInMemoryTransaction().setReceivedMessages(true);
-               return result;
+               return super.update(bean.getMessageContextRefKey(), bean);
        }
        
        public InvokerBean findUnique(InvokerBean bean) throws 
SandeshaException {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Tue Nov  6 01:05:28 2007
@@ -48,7 +48,6 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.workers.Invoker;
 import org.apache.sandesha2.workers.SandeshaThread;
 import org.apache.sandesha2.workers.Sender;
 
@@ -62,7 +61,6 @@
     private SenderBeanMgr senderBeanMgr = null;
     private InvokerBeanMgr invokerBeanMgr = null;
     private Sender sender = null;
-    private Invoker invoker = null;
     private PollingManager pollingManager = null;
     private HashMap transactions = new HashMap();
     private boolean useSerialization = false;
@@ -76,10 +74,6 @@
                SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
                useSerialization = policy.isUseMessageSerialization();
                
-               // Note that while inOrder is a global property we can decide 
if we need the
-               // invoker thread at this point. If we change this to be a 
sequence-level
-               // property then we'll need to revisit this.
-               boolean inOrder = policy.isInOrder();
                boolean polling = policy.isEnableMakeConnection();
                
                this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
@@ -87,7 +81,6 @@
                this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
                this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, 
context);
                this.sender = new Sender();
-               if(inOrder) this.invoker = new Invoker();
                if(polling) this.pollingManager = new PollingManager();
        }
 
@@ -135,7 +128,7 @@
         * Gets the Invoker for this Storage manager
         */
        public SandeshaThread getInvoker() {
-         return invoker;
+         return null;
        }
 
        /** 
@@ -364,6 +357,7 @@
                SOAPEnvelope   envelope;
        }
 }
+
 
 
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Tue Nov  6 01:05:28 2007
@@ -28,7 +28,6 @@
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMBean;
-import org.apache.sandesha2.workers.SandeshaThread;
 
 /**
  * This class does not really implement transactions, but it is a good
@@ -45,7 +44,6 @@
        private ArrayList enlistedBeans = new ArrayList();
        private InMemoryTransaction waitingForTran = null;
        private boolean sentMessages = false;
-       private boolean receivedMessages = false;
        private boolean active = true;
        
        InMemoryTransaction(InMemoryStorageManager manager, String threadName, 
int id) {
@@ -59,10 +57,6 @@
        public void commit() {
                releaseLocks();
                if(sentMessages) manager.getSender().wakeThread();
-               if(receivedMessages) {
-                       SandeshaThread invoker = manager.getInvoker();
-                       if(invoker != null) invoker.wakeThread();
-               }
                active = false;
        }
 
@@ -164,14 +158,11 @@
                return result.toString();
        }
 
-       public void setReceivedMessages(boolean receivedMessages) {
-               this.receivedMessages = receivedMessages;
-       }
-
        public void setSentMessages(boolean sentMessages) {
                this.sentMessages = sentMessages;
        }
 }
+
 
 
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Tue Nov  6 01:05:28 2007
@@ -1169,5 +1169,14 @@
                return epr;
        }
 
+       public static boolean isInOrder(MessageContext context) throws 
SandeshaException {
+               if (log.isDebugEnabled()) log.debug("Enter: 
SandeshaUtil::isInOrder");
+               
+               SandeshaPolicyBean policy = 
getPropertyBean(context.getAxisOperation());
+               boolean result = policy.isInOrder();
+               
+               if (log.isDebugEnabled()) log.debug("Enter: 
SandeshaUtil::isInOrder, " + result);
+               return result;
+       }
 
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 Tue Nov  6 01:05:28 2007
@@ -68,12 +68,13 @@
         * Otherwise messages skipped over will be ignored
         * @throws SandeshaException
         */
-       public synchronized void 
forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
+       public static synchronized void 
forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
                        String sequenceID,
                        boolean allowLaterDeliveryOfMissingMessages)throws 
SandeshaException{
-               //first we block while we wait for the invoking thread to pause
-               blockForPause();
+//             //first we block while we wait for the invoking thread to pause
+//             blockForPause();
                try{
+                       StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(ctx, ctx.getAxisConfiguration());
                        //get all invoker beans for the sequence
                        InvokerBeanMgr storageMapMgr = storageManager
                                        .getInvokerBeanMgr();
@@ -102,33 +103,23 @@
                                                InvokerBean invoker = 
(InvokerBean)stMapIt.next();
                                                
                                                // start a new worker thread 
and let it do the invocation.
-                                               String workId = sequenceID + 
"::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
-                                          //piece of work that will be 
assigned to the Worker.
+                                               String workId = sequenceID;
                                                
-                                               String messageContextKey = 
invoker.getMessageContextRefKey();
-                                               InvokerWorker worker = new 
InvokerWorker(context,
-                                                               
messageContextKey, 
-                                                               true); //want 
to ignore the enxt msg number
-                                               
-                                               worker.setLock(getWorkerLock());
+                                               InvokerWorker worker = new 
InvokerWorker(ctx, invoker);
+                                               worker.forceOutOfOrder();
+                                               worker.setPooled();
                                                worker.setWorkId(workId);
                                                
                                                // Wrap the invoker worker with 
the correct context, if needed.
                                                Runnable work = worker;
-                                               ContextManager contextMgr = 
SandeshaUtil.getContextManager(context);
+                                               ContextManager contextMgr = 
SandeshaUtil.getContextManager(ctx);
                                                if(contextMgr != null) {
                                                        work = 
contextMgr.wrapWithContext(work, invoker.getContext());
                                                }
                                                
-                                               try {
-                                                       // Try and set the lock 
up before we start the thread, but roll it back
-                                                       // if we hit any 
problems
-                                                       
if(worker.getLock().addWork(workId, worker)){
-                                                               
threadPool.execute(work);
-                                                       }
-                                               } catch(Exception e) {
-                                                       
worker.getLock().removeWork(workId);
-                                               }
+                                               // Setup the lock for the new 
worker
+                                               
worker.getLock().addWork(workId, worker);
+                                               
ctx.getThreadPool().execute(work);
 
                                                long msgNumber = 
invoker.getMsgNo();
                                                //if necessary, update the 
"next message number" bean under this transaction
@@ -176,8 +167,8 @@
                        }
                }
                finally{
-                       //restart the invoker
-                       finishPause();
+//                     //restart the invoker
+//                     finishPause();
                }
        }
 
@@ -311,9 +302,7 @@
                                //see if this is an out of order msg
                                boolean beanIsOutOfOrderMsg = 
bean.getMsgNo()!=nextMsgno;
                                
-                               String workId = sequenceId + "::" + 
bean.getMsgNo(); 
-                                                                               
                                                        //creating a workId to 
uniquely identify the
-                                                                               
                                           //piece of work that will be 
assigned to the Worker.
+                               String workId = sequenceId; 
                                                                        
                                //check whether the bean is already assigned to 
a worker.
                                if (getWorkerLock().isWorkPresent(workId)) {
@@ -331,20 +320,15 @@
                                        return sleep;
                                }
 
-                               String messageContextKey = 
bean.getMessageContextRefKey();
-                               
                                if(transaction != null) {
                                        transaction.commit();
                                        transaction = null;
                                }
 
                                // start a new worker thread and let it do the 
invocation.
-                               InvokerWorker worker = new 
InvokerWorker(context,
-                                               messageContextKey, 
-                                               beanIsOutOfOrderMsg); //only 
ignore nextMsgNumber if the bean is an
-                                                                               
                                                        //out of order message
-                               
-                               worker.setLock(getWorkerLock());
+                               InvokerWorker worker = new 
InvokerWorker(context, bean);
+                               if(beanIsOutOfOrderMsg) 
worker.forceOutOfOrder();
+                               worker.setPooled();
                                worker.setWorkId(workId);
                                
                                // Wrap the invoker worker with the correct 
context, if needed.

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 Tue Nov  6 01:05:28 2007
@@ -17,6 +17,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.context.ContextManager;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -30,21 +31,161 @@
 
 public class InvokerWorker extends SandeshaWorker implements Runnable {
 
-       ConfigurationContext configurationContext = null;
-       String messageContextKey;
-       boolean ignoreNextMsg = false;
+       static final Log log = LogFactory.getLog(InvokerWorker.class);
+       static final WorkerLock lock = new WorkerLock();
        
-       Log log = LogFactory.getLog(InvokerWorker.class);
+       private ConfigurationContext configurationContext;
+       private String  sequence;
+       private long    messageNumber;
+       private String  messageContextKey;
+       private boolean ignoreNextMsg;
+       private boolean pooledThread;
        
-       public InvokerWorker (ConfigurationContext configurationContext, String 
messageContextKey, boolean ignoreNextMsg) {
+       public InvokerWorker (ConfigurationContext configurationContext, 
InvokerBean bean) {
+               // All invoker workers need to use the same lock, so we point 
to the static one here.
+               this.setLock(lock);
+               
                this.configurationContext = configurationContext;
-               this.messageContextKey = messageContextKey;
-               this.ignoreNextMsg = ignoreNextMsg;
+               initializeFromBean(bean);
        }
        
+       public void forceOutOfOrder() {
+               if(log.isDebugEnabled()) log.debug("Enter: 
InvokerWorker::forceOutOfOrder");
+               ignoreNextMsg = true;
+               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::forceOutOfOrder");
+       }
+
+       public void setPooled() {
+               if(log.isDebugEnabled()) log.debug("Enter: 
InvokerWorker::setPooled");
+               pooledThread = true;
+               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::setPooled");
+       }
+
+       private void initializeFromBean(InvokerBean bean) {
+               if(log.isDebugEnabled()) log.debug("Enter: 
InvokerWorker::initializeFromBean " + bean);
+               
+               this.sequence = bean.getSequenceID();
+               this.messageNumber = bean.getMsgNo();
+               this.messageContextKey = bean.getMessageContextRefKey();
+               
+               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::initializeFromBean");
+       }
+               
+       /**
+        * The run method invokes the message that this invoker has been primed 
with, but will
+        * also attempt to invoke subsequent messages. If the invoker worker is 
running on the
+        * application thread then we move on to a thread pool for the second 
message, but if
+        * we are already on a pooled thread then we just continue.
+        */
        public void run() {
-               if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+               if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, 
message " + messageNumber + ", sequence " + sequence);
+               
+               // If we are not the holder of the correct lock, then we have 
to stop
+               if(lock != null && !lock.ownsLock(workId, this)) {
+                       if (log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::run, another worker holds the lock");
+                       return;
+               }
+               
+               Transaction tran = null;
+               try {
+                       InvokerWorker nextWorker = null;
+                       Runnable nextRunnable = null;
+
+                       // Invoke the first message
+                       invokeMessage(null);
+
+                       // Look for the next message, so long as we are still 
processing normally
+                       while(!ignoreNextMsg) {
+                               InvokerBean finder = new InvokerBean();
+                               finder.setSequenceID(sequence);
+                               finder.setMsgNo(messageNumber + 1);
+
+                               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+                               tran = storageManager.getTransaction();
+
+                               InvokerBeanMgr mgr = 
storageManager.getInvokerBeanMgr();
+                               InvokerBean nextBean = mgr.findUnique(finder);
+
+                               if(nextBean != null) {
+                                       if(pooledThread) {
+                                               initializeFromBean(nextBean);
+                                               final Transaction theTran = 
tran;
+                                               Runnable work = new Runnable() {
+                                                       public void run() {
+                                                               
invokeMessage(theTran);
+                                                       }
+                                               };
+
+                                               // Wrap the work with the 
correct context, if needed.
+                                               ContextManager contextMgr = 
SandeshaUtil.getContextManager(configurationContext);
+                                               if(contextMgr != null) {
+                                                       work = 
contextMgr.wrapWithContext(work, nextBean.getContext());
+                                               }
+
+                                               // Finally do the work
+                                               work.run();
+
+                                               tran = null;
+                                       } else {
+                                               nextWorker = new 
InvokerWorker(configurationContext, nextBean);
+                                               nextWorker.setPooled();
+                                               nextWorker.setWorkId(workId);
+
+                                               // Wrap the invoker worker with 
the correct context, if needed.
+                                               ContextManager contextMgr = 
SandeshaUtil.getContextManager(configurationContext);
+                                               if(contextMgr != null) {
+                                                       nextRunnable = 
contextMgr.wrapWithContext(nextWorker, nextBean.getContext());
+                                               } else {
+                                                       nextRunnable = 
nextWorker;
+                                               }
+                                       }
+                               }
                
+                               // Clean up the tran, in case we didn't pass it 
into the invoke method
+                               if(tran != null) tran.commit();
+                               tran = null;
+                                               
+                               if(nextBean == null || nextWorker != null) {
+                                       // We have run out of work, or the new 
worker has taken it on, so we can
+                                       // break out of the loop
+                                       break;
+                               }
+                       }
+                                       
+                       if (workId !=null && lock!=null) {
+                               lock.removeWork(workId);
+                       }
+
+                       // If we created another worker, set it running now 
that we have released the lock
+                       if(nextWorker != null) {
+                               lock.addWork(workId, nextWorker);
+                               
configurationContext.getThreadPool().execute(nextRunnable);
+                       }
+
+               } catch(SandeshaException e) {
+                       log.debug("Exception within InvokerWorker", e);
+
+                       // Clean up the tran, if there is one left
+                       if(tran != null) {
+                               try {
+                                       tran.rollback();
+                               } catch(SandeshaException e2) {
+                                       log.debug("Exception rolling back 
tran", e2);
+                               }
+                       }
+               } finally {
+                       // Release the lock
+                       if (workId !=null && lock!=null && 
lock.ownsLock(workId, this)) {
+                               lock.removeWork(workId);
+                       }
+               }
+                               
+               if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+       }
+
+       private void invokeMessage(Transaction tran) {
+               if(log.isDebugEnabled()) log.debug("Enter: 
InvokerWorker::invokeMessage");
+
                Transaction transaction = null;
                MessageContext msgToInvoke = null;
                
@@ -54,7 +195,11 @@
                        InvokerBeanMgr invokerBeanMgr = 
storageManager.getInvokerBeanMgr();
                        
                        //starting a transaction
-                       transaction = storageManager.getTransaction();
+                       if(tran == null) {
+                               transaction = storageManager.getTransaction();
+                       } else {
+                               transaction = tran;
+                       }
                        
                        InvokerBean invokerBean = 
invokerBeanMgr.retrieve(messageContextKey);
 
@@ -153,7 +298,7 @@
                                        
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), 
storageManager);
                                        // exit from current iteration. (since 
an entry
                                        // was removed)
-                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::run Last message return");     
+                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::invokeMessage Last message return");   
                                        if(transaction != null && 
transaction.isActive()) transaction.commit();
                                        return;
                                }
@@ -181,10 +326,6 @@
                        if (log.isErrorEnabled())
                                log.error(e.toString(), e);
                } finally {
-                       if (workId !=null && lock!=null) {
-                               lock.removeWork(workId);
-                       }
-
                        if (transaction!=null && transaction.isActive()) {
                                try {
                                        transaction.rollback();
@@ -195,7 +336,7 @@
                        }
                }
                
-               if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::invokeMessage");
        }
 
        private void makeMessageReadyForReinjection(MessageContext 
messageContext) {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 Tue Nov  6 01:05:28 2007
@@ -222,7 +222,6 @@
 toBeanNotSet=The 'To' Sequence Property Bean has not been set for the sequence.
 cannotFindTransportInDesc=Cannot find the transport in description {0} in the 
ConfigurationContext.
 invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' 
element.
-invokerNotFound=An invoker thread was not found to dispatch messages on the 
inbound sequence {0}.
 cannotSetPolicyBeanServiceNull=Cannot set the given SandeshaPolicyBean since 
the AxisService is not present
 
 #------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to