Author: mlovett
Date: Wed Dec  6 03:00:57 2006
New Revision: 483026

URL: http://svn.apache.org/viewvc?view=rev&rev=483026
Log:
Apply Tom's patch to refactor sender and invoker threads, see SANDESHA2-61

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Wed Dec  6 03:00:57 2006
@@ -12,6 +12,7 @@
        public static final String 
cannotInitSecurityManager="cannotInitSecurityManager";
        public static final String 
securityManagerMustImplement="securityManagerMustImplement";
        public static final String 
cannotFindModulePolicies="cannotFindModulePolicies";
+       public static final String cannotPauseThread = "cannotPauseThread";
 
        public static final String commitError="commitError";
        public static final String rollbackError="rollbackError";

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 Wed Dec  6 03:00:57 2006
@@ -29,6 +29,7 @@
 cannotInitSecurityManager=Cannot initialize the given security manager due to 
exception {0}.
 securityManagerMustImplement=SecurityManager {0} must implement the 
org.apache.sandesha2.storage.StorageManager interface.
 cannotFindModulePolicies=No policies were found in the module.xml at the 
module initiation time
+cannotPauseThread=Cannot pause a non-running thread.
 
 commitError=Exception thrown when trying to commit the transaction: {0}
 rollbackError=Exception thrown when trying to rollback the transaction: {0}

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Wed Dec  6 03:00:57 2006
@@ -22,8 +22,6 @@
 import java.util.List;
 
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -48,25 +46,16 @@
  * to find weather there are any messages to me invoked.
  */
 
-public class Invoker extends Thread {
+public class Invoker extends SandeshaThread {
 
-       private boolean runInvoker = false;
-       private ArrayList workingSequences = new ArrayList();
-       private ConfigurationContext context = null;
        private static final Log log = LogFactory.getLog(Invoker.class);
        
-       private boolean hasStoppedInvoking = false;
-       private boolean hasPausedInvoking = false;
-       private boolean pauseRequired = false;
-       
-       private transient ThreadFactory threadPool;
-       public int INVOKER_THREADPOOL_SIZE = 5;
+       public static final int INVOKER_THREADPOOL_SIZE = 5;
 
        private WorkerLock lock = null;
        
        public Invoker() {
-               threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
-                               INVOKER_THREADPOOL_SIZE);
+               super(INVOKER_THREADPOOL_SIZE, 
Sandesha2Constants.INVOKER_SLEEP_TIME);
                lock = new WorkerLock ();
        }
 
@@ -75,53 +64,12 @@
                        log.debug("Enter: 
InOrderInvoker::stopInvokerForTheSequence, "
                                        + sequenceID);
 
-               workingSequences.remove(sequenceID);
-               if (workingSequences.size() == 0) {
-                       runInvoker = false;
-               }
+               super.stopThreadForSequence(sequenceID);
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
InOrderInvoker::stopInvokerForTheSequence");
        }
        
-       
-       /**
-        * Waits for the invoking thread to pause
-        */
-       public synchronized void blockForPause(){
-               while(pauseRequired){
-                       //someone else is requesting a pause - wait for them to 
finish
-                       try{
-                               wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-                       }catch(InterruptedException e){
-                               //ignore
-                       }
-               }
-               
-         //we can now request a pause - the next pause will be ours
-         pauseRequired = true;
-                               
-               if(hasStoppedInvoking() || !isInvokerStarted()){
-                       throw new IllegalStateException("Cannot pause a 
non-running invoker thread"); //TODO NLS
-               }
-               while(!hasPausedInvoking){
-                       //wait for our pause to come around
-                       try{
-                               wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-                       }catch(InterruptedException e){
-                               //ignore
-                       }
-                       
-               }
-               //the invoker thread is now paused
-       }
-       
-       private synchronized void finishPause(){
-               //indicate that the current pause is no longer required.
-               pauseRequired = false;
-               notifyAll();
-       }
-       
        /**
         * Forces dispatch of queued messages to the application.
         * NOTE: may break ordering
@@ -242,31 +190,21 @@
        public synchronized void stopInvoking() {
                if (log.isDebugEnabled())
                        log.debug("Enter: InOrderInvoker::stopInvoking");
-               //NOTE: we do not take acount of pausing when stopping.
-               //The call to stop will wait until the invoker has exited the 
loop
-               if (isInvokerStarted()) {
-                       // the invoker is started so stop it
-                       runInvoker = false;
-                       // wait for it to finish
-                       while (!hasStoppedInvoking()) {
-                               try {
-                                       
wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-                               } catch (InterruptedException e1) {
-                                       log.debug(e1.getMessage());
-                               }
-                       }
-               }
+
+               super.stopRunning();
 
                if (log.isDebugEnabled())
                        log.debug("Exit: InOrderInvoker::stopInvoking");
        }
 
        public synchronized boolean isInvokerStarted() {
-               if (log.isDebugEnabled()) {
-                       log.debug("Enter: InOrderInvoker::isInvokerStarted");
-                       log.debug("Exit: InOrderInvoker::isInvokerStarted, " + 
runInvoker);
+               boolean isThreadStarted = super.isThreadStarted();
+               if(!isThreadStarted){
+                       //to avoid too much noise we should only trace if the 
invoker is not started
+                       if (log.isDebugEnabled())
+                               log.debug("invoker not started");       
                }
-               return runInvoker;
+               return isThreadStarted;
        }
 
        public synchronized void runInvokerForTheSequence(
@@ -274,47 +212,12 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: 
InOrderInvoker::runInvokerForTheSequence");
 
-               if (!workingSequences.contains(sequenceID))
-                       workingSequences.add(sequenceID);
-
-               if (!isInvokerStarted()) {
-                       this.context = context;
-                       runInvoker = true; // so that isSenderStarted()=true.
-                       super.start();
-               }
+               super.runThreadForSequence(context, sequenceID);
+               
                if (log.isDebugEnabled())
                        log.debug("Exit: 
InOrderInvoker::runInvokerForTheSequence");
        }
 
-       private synchronized boolean hasStoppedInvoking() {
-               if (log.isDebugEnabled()) {
-                       log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
-                       log
-                                       .debug("Exit: 
InOrderInvoker::hasStoppedInvoking, "
-                                                       + hasStoppedInvoking);
-               }
-               return hasStoppedInvoking;
-       }
-
-       public void run() {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: InOrderInvoker::run");
-
-               try {
-                       internalRun();
-               } finally {
-                       // flag that we have exited the run loop and notify any 
waiting
-                       // threads
-                       synchronized (this) {
-                               hasStoppedInvoking = true;
-                               notify();
-                       }
-               }
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: InOrderInvoker::run");
-       }
-
        private void addOutOfOrderInvokerBeansToList(String sequenceID, 
                        StorageManager strMgr, List list)throws 
SandeshaException{
                if (log.isDebugEnabled())
@@ -351,7 +254,7 @@
                        log.debug("Exit: 
InOrderInvoker::addOutOfOrderInvokerBeansToList");
        }
        
-       private void internalRun() {
+       protected void internalRun() {
                if (log.isDebugEnabled())
                        log.debug("Enter: InOrderInvoker::internalRun");
                
@@ -368,25 +271,8 @@
                                log.debug(ex.getMessage());
                        }
                                        
-                       //see if we need to pause
-                       synchronized(this){
-                               
-                               while(pauseRequired){
-                                       if(!hasPausedInvoking){
-                                               //let the requester of this 
pause know we are now pausing
-                                         hasPausedInvoking = true;
-                                         notifyAll();                          
                
-                                       }
-                                       //now we pause
-                                 try{
-                                       
wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-                                 }catch(InterruptedException e){
-                                       //ignore
-                                 }
-                               }//end while
-                               //the request to pause has finished so we are 
no longer pausing
-                               hasPausedInvoking = false;
-                       }
+                       //pause if we have to
+                       doPauseIfNeeded();
 
                        Transaction transaction = null;
                        boolean rolebacked = false;

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Wed Dec  6 03:00:57 2006
@@ -17,12 +17,8 @@
 
 package org.apache.sandesha2.workers;
 
-import java.util.ArrayList;
-
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -41,92 +37,54 @@
  * Sender table to find out any entries that should be sent.
  */
 
-public class Sender extends Thread {
+public class Sender extends SandeshaThread {
+
 
-       private boolean runSender = false;
-       private ArrayList workingSequences = new ArrayList();
-       private ConfigurationContext context = null;
        private static final Log log = LogFactory.getLog(Sender.class);
-       private boolean hasStopped = false;
        
-    private transient ThreadFactory threadPool;
-    public int SENDER_THREADPOOL_SIZE = 5;
-    
-    private WorkerLock lock = null;
+
+  public static final int SENDER_THREADPOOL_SIZE = 5;
+  
+  private WorkerLock lock = null;
     
     public Sender () {
-       threadPool = new ThreadPool 
(SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+       super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
        lock = new WorkerLock ();
     }
 
        public synchronized void stopSenderForTheSequence(String sequenceID) {
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::stopSenderForTheSequence, " + 
sequenceID);
-               workingSequences.remove(sequenceID);
-               if (workingSequences.size() == 0) {
-                       runSender = false;
-               }
+               
+               super.stopThreadForSequence(sequenceID);
+               
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::stopSenderForTheSequence");
        }
+       
 
        public synchronized void stopSending() {
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::stopSending");
 
-               if (isSenderStarted()) {
-                       // the sender is started so stop it
-                       runSender = false;
-                       // wait for it to finish
-                       while (!hasStoppedSending()) {
-                               try {
-                                       
wait(Sandesha2Constants.SENDER_SLEEP_TIME);
-                               } catch (InterruptedException e1) {
-                                       log.debug(e1.getMessage());
-                               }
-                       }
-               }
+               super.stopRunning();
 
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::stopSending");
        }
 
-       private synchronized boolean hasStoppedSending() {
-               if (log.isDebugEnabled()) {
-                       log.debug("Enter: Sender::hasStoppedSending");
-                       log.debug("Exit: Sender::hasStoppedSending, " + 
hasStopped);
-               }
-               return hasStopped;
-       }
-
        public synchronized boolean isSenderStarted() {
-               if (log.isDebugEnabled()) {
-                       log.debug("Enter: Sender::isSenderStarted");
-                       log.debug("Exit: Sender::isSenderStarted, " + 
runSender);
+               boolean isThreadStarted = super.isThreadStarted();
+               if(!isThreadStarted){
+                       //to avoid too much noise we should only trace if the 
sender is not started
+                       if (log.isDebugEnabled())
+                               log.debug("sender not started");        
                }
-               return runSender;
+               return isThreadStarted;
        }
 
-       public void run() {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::run");
-
-               try {
-                       internalRun();
-               } finally {
-                       // flag that we have exited the run loop and notify any 
waiting
-                       // threads
-                       synchronized (this) {
-                               hasStopped = true;
-                               notify();
-                       }
-               }
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::run");
-       }
 
-       private void internalRun() {
+       protected void internalRun() {
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::internalRun");
 
@@ -151,6 +109,9 @@
                                log.debug(e1.getMessage());
                                log.debug("End printing Interrupt...");
                        }
+                       
+                       //pause if we have to
+                       doPauseIfNeeded();
 
                        Transaction transaction = null;
                        boolean rolebacked = false;
@@ -164,7 +125,7 @@
                                        log.debug(message);
                                        throw new SandeshaException(message);
                                }
-
+                               
                                // TODO make sure this locks on reads.
                                transaction = storageManager.getTransaction();
 
@@ -277,14 +238,8 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: Sender::runSenderForTheSequence, " + 
sequenceID);
 
-               if (sequenceID != null && 
!workingSequences.contains(sequenceID))
-                       workingSequences.add(sequenceID);
-
-               if (!isSenderStarted()) {
-                       this.context = context;
-                       runSender = true; // so that isSenderStarted()=true.
-                       super.start();
-               }
+               runThreadForSequence(context, sequenceID);
+               
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::runSenderForTheSequence");
        }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to