Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (972 => 973)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-30 16:04:08 UTC (rev 972)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-30 16:31:08 UTC (rev 973)
@@ -19,7 +19,6 @@
package org.servicemix.jbi.nmr.flow.jms;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.activemq.ActiveMQConnection;
@@ -77,6 +76,7 @@
private MessageProducer topicProducer;
private Topic broadcastTopic;
private Session broadcastSession;
+ private MessageConsumer broadcastConsumer;
private Session inboundSession;
private ConsumerAdvisor advisor;
private Map networkNodeKeyMap = new ConcurrentHashMap();
@@ -171,6 +171,7 @@
* @throws JBIException
*/
public void init(Broker broker, String subType) throws JBIException {
+ log.info(broker.getContainerName() + ": Initializing jms flow");
super.init(broker, subType);
broker.getRegistry().addComponentPacketListener(this);
try {
@@ -196,12 +197,8 @@
queueProducer = inboundSession.createProducer(null);
broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
- MessageConsumer broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
- broadcastConsumer.setMessageListener(this);
topicProducer = broadcastSession.createProducer(broadcastTopic);
topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- advisor = new ConsumerAdvisor(connection, broadcastTopic);
- advisor.addListener(this);
}
catch (JMSException e) {
log.error("Failed to initialize JMSFlow", e);
@@ -216,8 +213,13 @@
*/
public void start() throws JBIException {
if (started.compareAndSet(false, true)) {
+ log.info(broker.getContainerName() + ": Starting jms flow");
super.start();
try {
+ broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
+ broadcastConsumer.setMessageListener(this);
+ advisor = new ConsumerAdvisor(connection, broadcastTopic);
+ advisor.addListener(this);
advisor.start();
}
catch (JMSException e) {
@@ -234,9 +236,11 @@
*/
public void stop() throws JBIException {
if (started.compareAndSet(true, false)) {
+ log.info(broker.getContainerName() + ": Stopping jms flow");
super.stop();
try {
advisor.stop();
+ broadcastConsumer.close();
}
catch (JMSException e) {
JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
@@ -274,24 +278,26 @@
*/
public void onEvent(ComponentPacketEvent event) {
try {
- String componentName = event.getPacket().getComponentNameSpace().getName();
- if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
- if(!consumerMap.containsKey(componentName)){
- Queue queue=inboundSession.createQueue(INBOUND_PREFIX+componentName);
- MessageConsumer consumer=inboundSession.createConsumer(queue);
- consumer.setMessageListener(this);
- consumerMap.put(componentName,consumer);
+ // broadcast internal changes to the network
+ if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
+ String componentName = event.getPacket().getComponentNameSpace().getName();
+ if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
+ if (!consumerMap.containsKey(componentName)) {
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
+ MessageConsumer consumer = inboundSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ consumerMap.put(componentName,consumer);
+ }
+ } else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
+ MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
+ if (consumer != null){
+ consumer.close();
+ }
}
- }else if(event.getStatus()==ComponentPacketEvent.DEACTIVATED){
- MessageConsumer consumer=(MessageConsumer) consumerMap.remove(componentName);
- if(consumer!=null){
- consumer.close();
- }
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
+ topicProducer.send(msg);
}
- // broadcast change to the network
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- topicProducer.send(msg);
- log.info("broadcast to internal JMS network: " + event);
}
catch (JMSException e) {
log.error("failed to broadcast to the internal JMS network: " + event, e);
@@ -306,17 +312,30 @@
public void onEvent(ConsumerAdvisoryEvent event) {
if (started.get()) {
ConsumerInfo info = event.getInfo();
- if (info.isStarted()) {
- for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
- LocalComponentConnector lcc = (LocalComponentConnector) i.next();
- ComponentPacket packet = lcc.getPacket();
- ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
- onEvent(cpe);
+ if (!broker.getContainerName().equals(info.getClientId())) {
+ if (info.isStarted()) {
+ log.info(broker.getContainerName() + ": new node discovered " + info.getClientId());
+ // The new node is started, so send it all components state
+ try {
+ String destination = INBOUND_PREFIX + info.getClientId();
+ Queue queue = inboundSession.createQueue(destination);
+ for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
+ LocalComponentConnector lcc = (LocalComponentConnector) i.next();
+ ComponentPacket packet = lcc.getPacket();
+ ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
+ ObjectMessage msg = inboundSession.createObjectMessage(cpe);
+ log.info(broker.getContainerName() + ": sending info to " + info.getClientId() + " for " + lcc.getComponentNameSpace());
+ queueProducer.send(queue, msg);
+ }
+ } catch (JMSException e) {
+ log.error("failed to broadcast to the internal JMS network: " + event, e);
+ }
}
+ else {
+ log.info(broker.getContainerName() + ": node " + info.getClientId() + " has stopped");
+ removeAllPackets(info.getClientId());
+ }
}
- else {
- removeAllPackets(info.getClientId());
- }
}
}
@@ -368,7 +387,7 @@
*/
public void onMessage(Message message) {
try {
- if (message != null && message instanceof ObjectMessage) {
+ if (started.get() && message != null && message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
Object obj = objMsg.getObject();
if (obj != null) {
@@ -404,33 +423,18 @@
int eventStatus = event.getStatus();
switch (eventStatus) {
case ComponentPacketEvent.ACTIVATED:
- addRemotePacket(containerName, packet);
+ case ComponentPacketEvent.STATE_CHANGE:
+ updateRemotePacket(containerName, packet);
break;
case ComponentPacketEvent.DEACTIVATED:
removeRemotePacket(containerName, packet);
break;
- case ComponentPacketEvent.STATE_CHANGE:
- updateRemotePacket(containerName, packet);
- break;
default:
log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
}
}
}
- private void addRemotePacket(String containerName, ComponentPacket packet) {
- networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set == null) {
- set = new CopyOnWriteArraySet();
- networkNodeKeyMap.put(containerName, set);
- }
- ComponentConnector cc = new ComponentConnector(packet);
- log.info("Adding Remote Component: " + cc);
- broker.getRegistry().addRemoteComponentConnector(cc);
- set.add(packet);
- }
-
private void updateRemotePacket(String containerName, ComponentPacket packet) {
Set set = (Set) networkNodeKeyMap.get(containerName);
if (set != null) {
@@ -438,7 +442,7 @@
set.add(packet);
}
ComponentConnector cc = new ComponentConnector(packet);
- log.info("Updating remote Component: " + cc);
+ log.info(broker.getContainerName() + ": updating remote component: " + cc);
broker.getRegistry().updateRemoteComponentConnector(cc);
}
@@ -448,7 +452,7 @@
if (set != null) {
set.remove(packet);
ComponentConnector cc = new ComponentConnector(packet);
- log.info("Removing remote Component: " + cc);
+ log.info(broker.getContainerName() + ": removing remote component: " + cc);
broker.getRegistry().removeRemoteComponentConnector(cc);
if (set.isEmpty()) {
networkNodeKeyMap.remove(containerName);
@@ -462,7 +466,7 @@
for (Iterator i = set.iterator();i.hasNext();) {
ComponentPacket packet = (ComponentPacket) i.next();
ComponentConnector cc = new ComponentConnector(packet);
- log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
+ log.info(broker.getContainerName() + ": Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
broker.getRegistry().removeRemoteComponentConnector(cc);
networkComponentKeyMap.remove(packet.getComponentNameSpace());
}