| Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr on MAIN | |||
| Broker.java | +33 | -1 | 1.18 -> 1.19 |
| flow/cluster/ClusterFlow.java | +2 | -2 | 1.10 -> 1.11 |
| flow/st/STFlow.java | +2 | -2 | 1.5 -> 1.6 |
| flow/Flow.java | +13 | -1 | 1.6 -> 1.7 |
| /AbstractFlow.java | +58 | -1 | 1.7 -> 1.8 |
| flow/seda/SedaFlow.java | +2 | -2 | 1.9 -> 1.10 |
| +110 | -9 | ||
Added ability to suspend/resume the NMR processing
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.18 -r1.19 --- Broker.java 5 Aug 2005 08:13:32 -0000 1.18 +++ Broker.java 22 Aug 2005 09:25:43 -0000 1.19 @@ -23,6 +23,8 @@
import javax.jbi.component.Component; import javax.jbi.messaging.MessagingException; import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.management.JMException; +import javax.management.MBeanOperationInfo;
import javax.resource.spi.work.WorkManager; import javax.xml.namespace.QName; import org.apache.commons.logging.Log;
@@ -36,6 +38,8 @@
import org.servicemix.jbi.framework.Registry; import org.servicemix.jbi.management.BaseLifeCycle; import org.servicemix.jbi.management.ManagementContext;
+import org.servicemix.jbi.management.OperationInfoHelper; +import org.servicemix.jbi.management.ParameterHelper;
import org.servicemix.jbi.messaging.ExchangePacket; import org.servicemix.jbi.messaging.MessageExchangeImpl; import org.servicemix.jbi.nmr.flow.Flow;
@@ -51,7 +55,7 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix *
- * @version $Revision: 1.18 $
+ * @version $Revision: 1.19 $
*/
public class Broker extends BaseLifeCycle {
private JBIContainer container;
@@ -184,6 +188,20 @@
public Flow getFlow(){
return this.flow;
}
+
+ /**
+ * suspend the flow to prevent any message exchanges
+ */
+ public void suspend(){
+ flow.suspend();
+ }
+
+ /**
+ * resume message exchange processing
+ */
+ public void resume(){
+ flow.resume();
+ }
/**
* Route an ExchangePacket to a destination
@@ -376,6 +394,20 @@
else {
return new ProducerComponentEndpointFilter(component);
}
+ }
+
+ /**
+ * Get an array of MBeanOperationInfo
+ *
+ * @return array of OperationInfos
+ * @throws JMException
+ */
+ public MBeanOperationInfo[] getOperationInfos() throws JMException {
+ OperationInfoHelper helper = new OperationInfoHelper();
+ helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
+ helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
+
+ return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/cluster
diff -u -r1.10 -r1.11 --- ClusterFlow.java 27 Jul 2005 07:35:20 -0000 1.10 +++ ClusterFlow.java 22 Aug 2005 09:25:44 -0000 1.11 @@ -54,7 +54,7 @@
/** * Use for message routing amonst a cluster of containers. All routing/cluster registration happens automatically *
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class ClusterFlow extends SedaFlow implements ClusterListener, MessageListener {
private static final Log log = LogFactory.getLog(ClusterFlow.class);
@@ -172,7 +172,7 @@
* @param packet
* @throws JBIException
*/
- public synchronized void send(ExchangePacket packet) throws JBIException {
+ protected synchronized void doSend(ExchangePacket packet) throws JBIException {
ComponentNameSpace cns = packet.getDestinationId();
SedaQueue queue = (SedaQueue) queueMap.get(cns);
if (queue == null) {
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/st
diff -u -r1.5 -r1.6 --- STFlow.java 29 Jun 2005 16:19:54 -0000 1.5 +++ STFlow.java 22 Aug 2005 09:25:44 -0000 1.6 @@ -25,7 +25,7 @@
/** * A simple Straight through flow *
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class STFlow extends AbstractFlow {
@@ -35,7 +35,7 @@
* @param packet
* @throws MessagingException
*/
- public void send(ExchangePacket packet) throws MessagingException {
+ protected void doSend(ExchangePacket packet) throws MessagingException {
doRouting(packet);
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
diff -u -r1.6 -r1.7 --- Flow.java 29 Jun 2005 16:19:54 -0000 1.6 +++ Flow.java 22 Aug 2005 09:25:44 -0000 1.7 @@ -26,7 +26,7 @@
/** * A Flow provides different dispatch policies within the NMR *
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public interface Flow {
@@ -68,5 +68,17 @@
* @throws JBIException
*/
public void shutDown() throws JBIException;
+ + + /** + * suspend the flow to prevent any message exchanges + */ + public void suspend(); + + + /** + * resume message exchange processing + */ + public void resume();
}
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow
diff -u -r1.7 -r1.8 --- AbstractFlow.java 29 Jun 2005 16:19:54 -0000 1.7 +++ AbstractFlow.java 22 Aug 2005 09:25:44 -0000 1.8 @@ -24,20 +24,27 @@
import javax.management.JMException; import javax.management.MBeanAttributeInfo; import javax.management.ObjectName;
+import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory;
import org.servicemix.jbi.framework.ComponentNameSpace; import org.servicemix.jbi.framework.LocalComponentConnector; import org.servicemix.jbi.management.AttributeInfoHelper; import org.servicemix.jbi.management.BaseLifeCycle; import org.servicemix.jbi.messaging.ExchangePacket; import org.servicemix.jbi.nmr.Broker;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; +import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
/** * A simple Straight through flow *
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
+ private static final Log log = LogFactory.getLog(AbstractFlow.class);
protected Broker broker;
+ private ReadWriteLock lock = new ReentrantWriterPreferenceReadWriteLock(); + private Thread suspendThread = null;
/**
* Initialize the Region
@@ -72,6 +79,9 @@
* @throws JBIException
*/
public void stop() throws JBIException{
+ if (suspendThread != null){
+ suspendThread.interrupt();
+ }
super.stop();
}
@@ -83,6 +93,53 @@
super.shutDown();
}
+
+ /**
+ * Distribute an ExchangePacket
+ * @param packet
+ * @throws JBIException
+ */
+ public void send(ExchangePacket packet) throws JBIException{
+ try {
+ lock.readLock().acquire();
+ doSend(packet);
+ }
+ catch (InterruptedException e) {
+ log.warn("Got interuppted exception grabbing flow lock",e);
+ }finally{
+ lock.readLock().release();
+ }
+ }
+
+ /**
+ * suspend the flow to prevent any message exchanges
+ */
+ public synchronized void suspend(){
+ try {
+ lock.writeLock().acquire();
+ suspendThread = Thread.currentThread();
+ }
+ catch (InterruptedException e) {
+ log.warn("Got interuppted exception grabbing suspend lock",e);
+ }
+
+ }
+
+
+ /**
+ * resume message exchange processing
+ */
+ public synchronized void resume(){
+ lock.writeLock().release();
+ suspendThread = null;
+ }
+
+ /**
+ * Do the Flow specific routing
+ * @param packet
+ * @throws JBIException
+ */
+ protected abstract void doSend(ExchangePacket packet) throws JBIException;
/**
* Distribute an ExchangePacket
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/seda
diff -u -r1.9 -r1.10 --- SedaFlow.java 27 Jul 2005 07:35:20 -0000 1.9 +++ SedaFlow.java 22 Aug 2005 09:25:44 -0000 1.10 @@ -42,7 +42,7 @@
/** * A simple Straight through flow *
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SedaFlow extends AbstractFlow implements ComponentPacketEventListener {
private static final Log log = LogFactory.getLog(SedaFlow.class);
@@ -134,7 +134,7 @@
* @param packet
* @throws JBIException
*/
- public synchronized void send(ExchangePacket packet) throws JBIException {
+ protected synchronized void doSend(ExchangePacket packet) throws JBIException {
ComponentNameSpace cns = packet.getDestinationId();
if (packet.isOutbound()) {
SedaQueue queue = (SedaQueue) queueMap.get(cns);
