Author: mckierna
Date: Tue Nov 13 04:04:18 2007
New Revision: 594502

URL: http://svn.apache.org/viewvc?rev=594502&view=rev
Log:
some refactoring and potential NPE protection

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Tue Nov 13 04:04:18 2007
@@ -382,7 +382,7 @@
                // then we should hand the message over to the invoker thread. 
If not, we can invoke
                // it directly ourselves.
                InvokerWorker worker = null;
-               if (SandeshaUtil.isInOrder(msgCtx) || 
storageManager.hasUserTransaction(msgCtx)) {
+               if (SandeshaUtil.isInOrder(msgCtx)) {
                    
                        InvokerBean invokerBean = new InvokerBean(key, msgNo, 
sequenceId);
                        ContextManager contextMgr = 
SandeshaUtil.getContextManager(configCtx);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 Tue Nov 13 04:04:18 2007
@@ -59,6 +59,7 @@
        private String  messageContextKey;
        private boolean ignoreNextMsg;
        private boolean pooledThread;
+       boolean lastMessageInvoked;
        
        public InvokerWorker (ConfigurationContext configurationContext, 
InvokerBean bean) {
                // All invoker workers need to use the same lock, so we point 
to the static one here.
@@ -99,11 +100,6 @@
        public void run() {
                if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, 
message " + messageNumber + ", sequence " + sequence);
                
-               // If we are not the holder of the correct lock, then we have 
to stop
-               if(lock != null && !lock.ownsLock(workId, this)) {
-                       if (log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::run, another worker holds the lock");
-                       return;
-               }
                
                Transaction tran = null;
                try {
@@ -111,10 +107,11 @@
                        Runnable nextRunnable = null;
 
                        // Invoke the first message
-                       invokeMessage(null);
+               lastMessageInvoked = invokeMessage(null);
 
                        // Look for the next message, so long as we are still 
processing normally
-                       while(!ignoreNextMsg) {
+                       while(!ignoreNextMsg && lastMessageInvoked) {
+                               if(log.isDebugEnabled()) 
log.debug("InvokerWorker:: looking for next msg to invoke");
                                InvokerBean finder = new InvokerBean();
                                finder.setSequenceID(sequence);
                                finder.setMsgNo(messageNumber + 1);
@@ -127,11 +124,12 @@
 
                                if(nextBean != null) {
                                        if(pooledThread) {
+                                               if(log.isDebugEnabled()) 
log.debug("InvokerWorker:: pooledThread");
                                                initializeFromBean(nextBean);
                                                final Transaction theTran = 
tran;
                                                Runnable work = new Runnable() {
                                                        public void run() {
-                                                               
invokeMessage(theTran);
+                                                               
lastMessageInvoked = invokeMessage(theTran);
                                                        }
                                                };
 
@@ -146,6 +144,7 @@
 
                                                tran = null;
                                        } else {
+                                               if(log.isDebugEnabled()) 
log.debug("InvokerWorker:: not pooled thread");
                                                nextWorker = new 
InvokerWorker(configurationContext, nextBean);
                                                nextWorker.setPooled();
                                                nextWorker.setWorkId(workId);
@@ -169,7 +168,7 @@
                                        // break out of the loop
                                        break;
                                }
-                       }
+                       }//end while
                                        
                        if (workId !=null && lock!=null) {
                                lock.removeWork(workId);
@@ -202,11 +201,18 @@
                if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
        }
 
-       private void invokeMessage(Transaction tran) {
+       private boolean invokeMessage(Transaction tran) {
                if(log.isDebugEnabled()) log.debug("Enter: 
InvokerWorker::invokeMessage");
 
                Transaction transaction = null;
                MessageContext msgToInvoke = null;
+               boolean messageInvoked = true;
+               
+           // If we are not the holder of the correct lock, then we have to 
stop
+           if(lock != null && (!lock.ownsLock(workId, this))) {
+               if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, 
another worker holds the lock");
+               return false;
+           }
                
                try {
                        
@@ -223,6 +229,11 @@
                        InvokerBean invokerBean = 
invokerBeanMgr.retrieve(messageContextKey);
 
                        msgToInvoke = 
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+                       if(msgToInvoke==null){
+                               //return since there is nothing to do
+                               if(log.isDebugEnabled()) log.debug("null msg");
+                               return false;
+                       }
 
                        // ending the transaction before invocation.
                        if(transaction != null) {
@@ -288,6 +299,7 @@
 
                                if (transaction != null && 
transaction.isActive())
                                        transaction.rollback();
+                               messageInvoked = false;
                                
                                handleFault(rmMsg, e);
                        }
@@ -317,9 +329,9 @@
                                        
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), 
storageManager);
                                        // exit from current iteration. (since 
an entry
                                        // was removed)
-                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::invokeMessage Last message return");   
                                        if(transaction != null && 
transaction.isActive()) transaction.commit();
-                                       return;
+                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::invokeMessage Last message return " + 
messageInvoked);
+                                       return messageInvoked;  
                                }
                        }
                        
@@ -344,6 +356,7 @@
                } catch (Exception e) {
                        if (log.isErrorEnabled())
                                log.error(e.toString(), e);
+                       messageInvoked = false;
                } finally {
                        if (transaction!=null && transaction.isActive()) {
                                try {
@@ -355,7 +368,8 @@
                        }
                }
                
-               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::invokeMessage");
+               if(log.isDebugEnabled()) log.debug("Exit: 
InvokerWorker::invokeMessage " + messageInvoked);
+               return messageInvoked;
        }
 
        private void makeMessageReadyForReinjection(MessageContext 
messageContext) {



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to