Author: gatfora
Date: Thu Dec  7 08:04:03 2006
New Revision: 483508

URL: http://svn.apache.org/viewvc?view=rev&rev=483508
Log:
Another update to common up the processing for Sender and Invoker see 
SANDESHA2-61

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 Thu Dec  7 08:04:03 2006
@@ -538,10 +538,6 @@
        
        String DEFAULT_STORAGE_MANAGER = INMEMORY_STORAGE_MANAGER;
        
-       String SENDER = "Sender";
-       
-       String INVOKER = "Invoker";
-       
        String POLLING_MANAGER = "PollingManager";
        
        String WITHIN_TRANSACTION = "WithinTransaction";

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
 Thu Dec  7 08:04:03 2006
@@ -821,7 +821,7 @@
 
                        //only do this if we are running inOrder
                        
if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
-                               Invoker invoker = (Invoker) 
configContext.getProperty(Sandesha2Constants.INVOKER);
+                               Invoker invoker = 
(Invoker)SandeshaUtil.getSandeshaStorageManager(configContext, 
configContext.getAxisConfiguration()).getInvoker();
                                if (invoker==null){
                                        throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
                                                
SandeshaMessageKeys.invokerNotFound, sequenceID));

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
 Thu Dec  7 08:04:03 2006
@@ -25,6 +25,7 @@
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.workers.SandeshaThread;
 
 /**
  * Storage managers should extend this.
@@ -51,6 +52,10 @@
        public abstract void initStorage (AxisModule moduleDesc) throws 
SandeshaStorageException;
 
        public abstract Transaction getTransaction();
+       
+       public abstract SandeshaThread getSender();
+       
+       public abstract SandeshaThread getInvoker();
 
        public abstract CreateSeqBeanMgr getCreateSeqBeanMgr();
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Thu Dec  7 08:04:03 2006
@@ -38,6 +38,9 @@
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.RMBean;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.Invoker;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.Sender;
 
 public class InMemoryStorageManager extends StorageManager {
 
@@ -50,6 +53,8 @@
     private SequencePropertyBeanMgr sequencePropertyBeanMgr = null;
     private SenderBeanMgr senderBeanMgr = null;
     private InvokerBeanMgr invokerBeanMgr = null;
+    private Sender sender = null;
+    private Invoker invoker = null;
     private HashMap transactions = new HashMap();
     
        public InMemoryStorageManager(ConfigurationContext context) {
@@ -60,6 +65,8 @@
                this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
                this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, 
context);
                this.sequencePropertyBeanMgr = new 
InMemorySequencePropertyBeanMgr (this, context);
+               this.sender = new Sender();
+               this.invoker = new Invoker();
        }
 
        public Transaction getTransaction() {
@@ -91,6 +98,20 @@
                }
        }
        
+       /** 
+        * Gets the Invoker for this Storage manager
+        */
+       public SandeshaThread getInvoker() {
+         return invoker;
+  }
+
+       /** 
+        * Gets the Sender for this Storage manager
+        */
+       public SandeshaThread getSender() {
+         return sender;
+  }
+
        void enlistBean(RMBean bean) throws SandeshaStorageException {
                InMemoryTransaction t = null;
                synchronized (transactions) {
@@ -201,7 +222,9 @@
        public void storeSOAPEnvelope(SOAPEnvelope envelope, String key) throws 
SandeshaStorageException {
                // TODO no real value
        }
+
        
        
 }
+
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Thu Dec  7 08:04:03 2006
@@ -78,8 +78,7 @@
 import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.workers.Invoker;
-import org.apache.sandesha2.workers.Sender;
+import org.apache.sandesha2.workers.SandeshaThread;
 import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.CloseSequence;
@@ -213,44 +212,38 @@
                return sortedList;
        }
 
-       public static void startSenderForTheSequence(ConfigurationContext 
context, String sequenceID) {
+       public static void startSenderForTheSequence(ConfigurationContext 
context, String sequenceID) throws SandeshaException {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID " 
+ sequenceID);
                
-               Sender sender = (Sender) 
context.getProperty(Sandesha2Constants.SENDER);
-               
-               if (sender!=null)
-                       sender.runSenderForTheSequence(context, sequenceID);
-               else {
-                       sender = new Sender ();
-                       context.setProperty(Sandesha2Constants.SENDER,sender);
-                       sender.runSenderForTheSequence(context, sequenceID);
-               }
+               SandeshaThread sender = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getSender();         
+               sender.runThreadForSequence(context, sequenceID);
                
                if (log.isDebugEnabled())
                        log.debug("Exit: 
SandeshaUtil::startSenderForTheSequence");
        }
        
-       public static void stopSender(ConfigurationContext context) {
-               Sender sender = (Sender) 
context.getProperty(Sandesha2Constants.SENDER);
-               
-               if (sender!=null) {
-                       sender.stopSending ();
-               }
+       public static void stopSender(ConfigurationContext context) throws 
SandeshaException {
+               SandeshaThread sender = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getSender();
+               sender.stopRunning();           
        }
 
-       public static void startInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) {
+       public static void startInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) throws SandeshaException {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID 
" + sequenceID);
                
-               Invoker invoker = (Invoker) 
context.getProperty(Sandesha2Constants.INVOKER);
-               if (invoker!=null)
-                       invoker.runInvokerForTheSequence(context,sequenceID);
-               else {
-                       invoker = new Invoker ();
-                       context.setProperty(Sandesha2Constants.INVOKER,invoker);
-                       invoker.runInvokerForTheSequence(context,sequenceID);
-               }
+               SandeshaThread invoker = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getInvoker();
+               invoker.runThreadForSequence(context,sequenceID);
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: 
SandeshaUtil::startInvokerForTheSequence");                    
        }
-       
+
+       public static void stopInvoker(ConfigurationContext context) throws 
SandeshaException {
+               SandeshaThread invoker = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getInvoker();
+               invoker.stopRunning();
+       }
+
        public static void startPollingManager (ConfigurationContext 
configurationContext) throws SandeshaException {
                PollingManager pollingManager = (PollingManager) 
configurationContext.getProperty(
                                Sandesha2Constants.POLLING_MANAGER);
@@ -270,12 +263,6 @@
                        pollingManager.stopPolling ();
        }
        
-       public static void stopInvoker(ConfigurationContext context) {
-               Invoker invoker = (Invoker) 
context.getProperty(Sandesha2Constants.INVOKER);
-               if (invoker!=null)
-                       invoker.stopInvoking();
-       }
-
        public static String getMessageTypeString(int messageType) {
                switch (messageType) {
                case Sandesha2Constants.MessageTypes.CREATE_SEQ:

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Thu Dec  7 08:04:03 2006
@@ -58,17 +58,6 @@
                super(INVOKER_THREADPOOL_SIZE, 
Sandesha2Constants.INVOKER_SLEEP_TIME);
                lock = new WorkerLock ();
        }
-
-       public synchronized void stopInvokerForTheSequence(String sequenceID) {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: 
InOrderInvoker::stopInvokerForTheSequence, "
-                                       + sequenceID);
-
-               super.stopThreadForSequence(sequenceID);
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: 
InOrderInvoker::stopInvokerForTheSequence");
-       }
        
        /**
         * Forces dispatch of queued messages to the application.
@@ -187,37 +176,6 @@
                }
        }
 
-       public synchronized void stopInvoking() {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: InOrderInvoker::stopInvoking");
-
-               super.stopRunning();
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: InOrderInvoker::stopInvoking");
-       }
-
-       public synchronized boolean isInvokerStarted() {
-               boolean isThreadStarted = super.isThreadStarted();
-               if(!isThreadStarted){
-                       //to avoid too much noise we should only trace if the 
invoker is not started
-                       if (log.isDebugEnabled())
-                               log.debug("invoker not started");       
-               }
-               return isThreadStarted;
-       }
-
-       public synchronized void runInvokerForTheSequence(
-                       ConfigurationContext context, String sequenceID) {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: 
InOrderInvoker::runInvokerForTheSequence");
-
-               super.runThreadForSequence(context, sequenceID);
-               
-               if (log.isDebugEnabled())
-                       log.debug("Exit: 
InOrderInvoker::runInvokerForTheSequence");
-       }
-
        private void addOutOfOrderInvokerBeansToList(String sequenceID, 
                        StorageManager strMgr, List list)throws 
SandeshaException{
                if (log.isDebugEnabled())
@@ -262,7 +220,7 @@
                // try and give them all a chance to invoke messages.
                int nextIndex = 0;
 
-               while (isInvokerStarted()) {
+               while (isThreadStarted()) {
 
                        try {
                                
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 Thu Dec  7 08:04:03 2006
@@ -21,6 +21,8 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.util.threadpool.ThreadFactory;
 import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 
@@ -29,6 +31,8 @@
  */
 public abstract class SandeshaThread extends Thread{
 
+       private static final Log log = LogFactory.getLog(SandeshaThread.class);
+
        private boolean runThread = false;
        private boolean hasStoppedRunning = false;
        private boolean hasPausedRunning = false;
@@ -46,11 +50,16 @@
                this.sleepTime = sleepTime;
        }
        
-       protected void stopThreadForSequence(String sequenceID){
+       public synchronized void stopThreadForSequence(String sequenceID){
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
SandeshaThread::stopThreadForSequence, " + sequenceID);
+               
                workingSequences.remove(sequenceID);
-               if (workingSequences.size() == 0) {
-                       runThread = false;
-               }
+               if (workingSequences.size() == 0) 
+                       runThread = false;              
+               
+               if (log.isDebugEnabled())
+                       log.debug("Exit: 
SandeshaThread::stopThreadForSequence");               
        }
        
        /**
@@ -91,6 +100,9 @@
        }
        
        public synchronized void stopRunning() {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: SandeshaThread::stopRunning");
+
                //NOTE: we do not take acount of pausing when stopping.
                //The call to stop will wait until the invoker has exited the 
loop
                if (isThreadStarted()) {
@@ -105,10 +117,16 @@
                                }
                        }
                }
-
+               
+               if (log.isDebugEnabled())
+                       log.debug("Exit: SandeshaThread::stopRunning");
        }
        
        public synchronized boolean isThreadStarted() {
+
+               if (!runThread && log.isDebugEnabled())
+                       log.debug("SandeshaThread not started");        
+
                return runThread;
        }
        

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Thu Dec  7 08:04:03 2006
@@ -18,7 +18,6 @@
 package org.apache.sandesha2.workers;
 
 import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -47,42 +46,10 @@
   
   private WorkerLock lock = null;
     
-    public Sender () {
-       super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
-       lock = new WorkerLock ();
-    }
-
-       public synchronized void stopSenderForTheSequence(String sequenceID) {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::stopSenderForTheSequence, " + 
sequenceID);
-               
-               super.stopThreadForSequence(sequenceID);
-               
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::stopSenderForTheSequence");
-       }
-       
-
-       public synchronized void stopSending() {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::stopSending");
-
-               super.stopRunning();
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::stopSending");
-       }
-
-       public synchronized boolean isSenderStarted() {
-               boolean isThreadStarted = super.isThreadStarted();
-               if(!isThreadStarted){
-                       //to avoid too much noise we should only trace if the 
sender is not started
-                       if (log.isDebugEnabled())
-                               log.debug("sender not started");        
-               }
-               return isThreadStarted;
-       }
-
+  public Sender () {
+       super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
+       lock = new WorkerLock ();
+  }    
 
        protected void internalRun() {
                if (log.isDebugEnabled())
@@ -99,7 +66,7 @@
                        return;
                }
 
-               while (isSenderStarted()) {
+               while (isThreadStarted()) {
 
                        try {
                                
Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
@@ -232,16 +199,6 @@
                }
                if (log.isDebugEnabled())
                        log.debug("Exit: Sender::internalRun");
-       }
-
-       public synchronized void runSenderForTheSequence(ConfigurationContext 
context, String sequenceID) {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: Sender::runSenderForTheSequence, " + 
sequenceID);
-
-               runThreadForSequence(context, sequenceID);
-               
-               if (log.isDebugEnabled())
-                       log.debug("Exit: Sender::runSenderForTheSequence");
        }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to