Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr on MAIN
Broker.java+33-11.18 -> 1.19
flow/cluster/ClusterFlow.java+2-21.10 -> 1.11
flow/st/STFlow.java+2-21.5 -> 1.6
flow/Flow.java+13-11.6 -> 1.7
    /AbstractFlow.java+58-11.7 -> 1.8
flow/seda/SedaFlow.java+2-21.9 -> 1.10
+110-9
6 modified files
Added ability to suspend/resume the NMR processing

servicemix/base/src/main/java/org/servicemix/jbi/nmr
Broker.java 1.18 -> 1.19
diff -u -r1.18 -r1.19
--- Broker.java	5 Aug 2005 08:13:32 -0000	1.18
+++ Broker.java	22 Aug 2005 09:25:43 -0000	1.19
@@ -23,6 +23,8 @@
 import javax.jbi.component.Component;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
 import javax.resource.spi.work.WorkManager;
 import javax.xml.namespace.QName;
 import org.apache.commons.logging.Log;
@@ -36,6 +38,8 @@
 import org.servicemix.jbi.framework.Registry;
 import org.servicemix.jbi.management.BaseLifeCycle;
 import org.servicemix.jbi.management.ManagementContext;
+import org.servicemix.jbi.management.OperationInfoHelper;
+import org.servicemix.jbi.management.ParameterHelper;
 import org.servicemix.jbi.messaging.ExchangePacket;
 import org.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.servicemix.jbi.nmr.flow.Flow;
@@ -51,7 +55,7 @@
 /**
  * The Broker handles Nomalised Message Routing within ServiceMix
  *
- * @version $Revision: 1.18 $
+ * @version $Revision: 1.19 $
  */
 public class Broker  extends BaseLifeCycle {
     private JBIContainer container;
@@ -184,6 +188,20 @@
     public Flow getFlow(){
         return this.flow;
     }
+    
+    /**
+     * suspend the flow to prevent any message exchanges
+     */
+    public void suspend(){
+       flow.suspend();
+    }
+     
+    /**
+     * resume message exchange processing
+     */
+    public void resume(){
+        flow.resume();
+    }
 
     /**
      * Route an ExchangePacket to a destination
@@ -376,6 +394,20 @@
         else {
             return new ProducerComponentEndpointFilter(component);
         }
+    }
+    
+    /**
+     * Get an array of MBeanOperationInfo
+     * 
+     * @return array of OperationInfos
+     * @throws JMException
+     */
+    public MBeanOperationInfo[] getOperationInfos() throws JMException {
+        OperationInfoHelper helper = new OperationInfoHelper();
+        helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
+        helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
+       
+        return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
     }
 
     

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/cluster
ClusterFlow.java 1.10 -> 1.11
diff -u -r1.10 -r1.11
--- ClusterFlow.java	27 Jul 2005 07:35:20 -0000	1.10
+++ ClusterFlow.java	22 Aug 2005 09:25:44 -0000	1.11
@@ -54,7 +54,7 @@
 /**
  * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically
  * 
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
  */
 public class ClusterFlow extends SedaFlow implements ClusterListener, MessageListener {
     private static final Log log = LogFactory.getLog(ClusterFlow.class);
@@ -172,7 +172,7 @@
      * @param packet
      * @throws JBIException
      */
-    public synchronized void send(ExchangePacket packet) throws JBIException {
+    protected synchronized void doSend(ExchangePacket packet) throws JBIException {
         ComponentNameSpace cns = packet.getDestinationId();
         SedaQueue queue = (SedaQueue) queueMap.get(cns);
         if (queue == null) {

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/st
STFlow.java 1.5 -> 1.6
diff -u -r1.5 -r1.6
--- STFlow.java	29 Jun 2005 16:19:54 -0000	1.5
+++ STFlow.java	22 Aug 2005 09:25:44 -0000	1.6
@@ -25,7 +25,7 @@
 /**
  * A simple Straight through flow
  * 
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
  */
 public class STFlow extends AbstractFlow {
     
@@ -35,7 +35,7 @@
      * @param packet
      * @throws MessagingException
      */
-    public void send(ExchangePacket packet) throws MessagingException {
+    protected void doSend(ExchangePacket packet) throws MessagingException {
         doRouting(packet);
     }
     

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
Flow.java 1.6 -> 1.7
diff -u -r1.6 -r1.7
--- Flow.java	29 Jun 2005 16:19:54 -0000	1.6
+++ Flow.java	22 Aug 2005 09:25:44 -0000	1.7
@@ -26,7 +26,7 @@
 /**
  * A Flow provides different dispatch policies within the NMR
  *
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
  */
 public interface Flow  {
    
@@ -68,5 +68,17 @@
      * @throws JBIException
      */
     public void shutDown() throws JBIException;
+    
+    
+    /**
+     * suspend the flow to prevent any message exchanges
+     */
+    public void suspend();
+    
+    
+    /**
+     * resume message exchange processing
+     */
+    public void resume();
         
 }

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
AbstractFlow.java 1.7 -> 1.8
diff -u -r1.7 -r1.8
--- AbstractFlow.java	29 Jun 2005 16:19:54 -0000	1.7
+++ AbstractFlow.java	22 Aug 2005 09:25:44 -0000	1.8
@@ -24,20 +24,27 @@
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.ObjectName;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.servicemix.jbi.framework.ComponentNameSpace;
 import org.servicemix.jbi.framework.LocalComponentConnector;
 import org.servicemix.jbi.management.AttributeInfoHelper;
 import org.servicemix.jbi.management.BaseLifeCycle;
 import org.servicemix.jbi.messaging.ExchangePacket;
 import org.servicemix.jbi.nmr.Broker;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
 /**
  * A simple Straight through flow
  * 
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
  */
 public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
+    private static final Log log = LogFactory.getLog(AbstractFlow.class);
     protected Broker broker;
+    private ReadWriteLock lock = new ReentrantWriterPreferenceReadWriteLock();
+    private Thread suspendThread = null;
 
     /**
      * Initialize the Region
@@ -72,6 +79,9 @@
      * @throws JBIException
      */
     public void stop() throws JBIException{  
+        if (suspendThread != null){
+            suspendThread.interrupt();
+        }
         super.stop();
     }
     
@@ -83,6 +93,53 @@
         super.shutDown();
         
     }
+    
+    /**
+     * Distribute an ExchangePacket
+     * @param packet
+     * @throws JBIException
+     */
+    public void send(ExchangePacket packet) throws JBIException{
+        try {
+            lock.readLock().acquire();
+            doSend(packet);
+        }
+        catch (InterruptedException e) {
+          log.warn("Got interuppted exception grabbing flow lock",e);
+        }finally{
+            lock.readLock().release();
+        }
+    }
+    
+    /**
+     * suspend the flow to prevent any message exchanges
+     */
+    public synchronized void suspend(){
+        try {
+            lock.writeLock().acquire();
+            suspendThread = Thread.currentThread();
+        }
+        catch (InterruptedException e) {
+            log.warn("Got interuppted exception grabbing suspend lock",e);
+        }
+       
+    }
+    
+    
+    /**
+     * resume message exchange processing
+     */
+    public synchronized void resume(){
+        lock.writeLock().release();
+        suspendThread = null;
+    }
+    
+    /**
+     * Do the Flow specific routing
+     * @param packet
+     * @throws JBIException
+     */
+    protected abstract void doSend(ExchangePacket packet) throws JBIException;
 
     /**
      * Distribute an ExchangePacket

servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/seda
SedaFlow.java 1.9 -> 1.10
diff -u -r1.9 -r1.10
--- SedaFlow.java	27 Jul 2005 07:35:20 -0000	1.9
+++ SedaFlow.java	22 Aug 2005 09:25:44 -0000	1.10
@@ -42,7 +42,7 @@
 /**
  * A simple Straight through flow
  * 
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
  */
 public class SedaFlow extends AbstractFlow implements ComponentPacketEventListener {
     private static final Log log = LogFactory.getLog(SedaFlow.class);
@@ -134,7 +134,7 @@
      * @param packet
      * @throws JBIException
      */
-    public synchronized void send(ExchangePacket packet) throws JBIException {
+    protected synchronized void doSend(ExchangePacket packet) throws JBIException {
         ComponentNameSpace cns = packet.getDestinationId();
         if (packet.isOutbound()) {
             SedaQueue queue = (SedaQueue) queueMap.get(cns);
CVSspam 0.2.8



Reply via email to