| Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms on MAIN | |||
| JMSFlow.java | +19 | -4 | 1.2 -> 1.3 |
Changed so that each component subscribes to an individual queue - to allow for load balancing across clusters.
servicemix/base/src/main/java/org/servicemix/jbi/nmr/flow/jms
diff -u -r1.2 -r1.3 --- JMSFlow.java 24 Aug 2005 14:07:24 -0000 1.2 +++ JMSFlow.java 26 Aug 2005 10:41:13 -0000 1.3 @@ -59,12 +59,12 @@
/** * Use for message routing among a network containers. All routing/registration happens automatically *
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSFlow extends SedaFlow implements ConsumerAdvisoryEventListener, MessageListener {
private static final Log log = LogFactory.getLog(JMSFlow.class);
private static final String INBOUND_PREFIX = "org.servicemix.inbound.";
- private String jmsURL = ActiveMQConnection.DEFAULT_URL;
+ private String jmsURL = "peer://org.servicemix?persistent=false";
private String userName;
private String password;
private ConnectionFactory connectionFactory;
@@ -78,6 +78,7 @@
private ConsumerAdvisor advisor;
private Map networkNodeKeyMap = new ConcurrentHashMap();
private Map networkComponentKeyMap = new ConcurrentHashMap();
+ private Map consumerMap = new ConcurrentHashMap();
private SynchronizedBoolean started = new SynchronizedBoolean(false);
/**
@@ -305,6 +306,20 @@
public void onEvent(ComponentPacketEvent event) {
super.onEvent(event);
try {
+ String componentName = event.getPacket().getComponentNameSpace().getName();
+ if (event.getStatus() == ComponentPacketEvent.ACTIVATED){
+
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX +componentName);
+ MessageConsumer consumer = inboundSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ consumerMap.put(componentName,consumer);
+ }else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED){
+ MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
+ if (consumer != null){
+ consumer.close();
+ }
+
+ }
// broadcast change to the network
ObjectMessage msg = broadcastSession.createObjectMessage(event);
topicProducer.send(msg);
@@ -352,8 +367,8 @@
}
else {
try {
- String containerName = cc.getComponentNameSpace().getContainerName(); - Queue queue = inboundSession.createQueue(INBOUND_PREFIX + containerName);
+ String componentName = cc.getComponentNameSpace().getName(); + Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
ObjectMessage msg = inboundSession.createObjectMessage(packet);
queueProducer.send(queue, msg);
}
