Title: [719] trunk/core/src/main/java/org/servicemix/jbi/framework: added round-robin load balancing for cluster + jms flows
- Revision
- 719
- Author
- rajdavies
- Date
- 2005-11-02 05:52:43 -0500 (Wed, 02 Nov 2005)
Log Message
added round-robin load balancing for cluster + jms flows
Modified Paths
Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/framework/ComponentRegistry.java (718 => 719)
--- trunk/core/src/main/java/org/servicemix/jbi/framework/ComponentRegistry.java 2005-11-02 03:23:36 UTC (rev 718)
+++ trunk/core/src/main/java/org/servicemix/jbi/framework/ComponentRegistry.java 2005-11-02 10:52:43 UTC (rev 719)
@@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
@@ -42,6 +43,7 @@
private Map componentByNameMap = new ConcurrentHashMap();
private Map idMap = new ConcurrentHashMap();
private Map localIdMap = new ConcurrentHashMap();
+ private Map loadBalancedComponentMap = new ConcurrentHashMap();
/**
@@ -67,6 +69,7 @@
localIdMap.put(name, result);
addComponentConnector(result);
componentByNameMap.put(name, component);
+ addToLoadBalancedMap(name);
}
return result;
}
@@ -82,6 +85,7 @@
removeComponentConnector(result.getComponentNameSpace());
localIdMap.remove(result.getComponentNameSpace());
componentByNameMap.remove(result.getComponentNameSpace());
+ removeFromLoadBalancedMap(result.getComponentNameSpace());
}
return result;
}
@@ -121,6 +125,15 @@
}
/**
+ * For distributed containers, get a ComponentConnector by round-robin
+ * @param id
+ * @return the ComponentConnector or null
+ */
+ public ComponentConnector getLoadBalancedComponentConnector(ComponentNameSpace id){
+ return getComponentConnector(getLoadBalancedComponentName(id));
+ }
+
+ /**
* Add a ComponentConnector
* @param connector
*/
@@ -197,4 +210,36 @@
public void setContainerName(String containerName) {
this.containerName = containerName;
}
+
+ private synchronized void addToLoadBalancedMap(ComponentNameSpace cns){
+ String key = cns.getName();
+ LinkedList list = (LinkedList)loadBalancedComponentMap.get(key);
+ if (list == null){
+ list = new LinkedList();
+ }
+ list.add(cns);
+
+ }
+
+ private synchronized void removeFromLoadBalancedMap(ComponentNameSpace cns){
+ String key = cns.getName();
+ LinkedList list = (LinkedList)loadBalancedComponentMap.get(key);
+ if (list != null){
+ list.remove(cns);
+ if (list.isEmpty()){
+ loadBalancedComponentMap.remove(key);
+ }
+ }
+ }
+
+ private synchronized ComponentNameSpace getLoadBalancedComponentName(ComponentNameSpace cns){
+ ComponentNameSpace result = null;
+ String key = cns.getName();
+ LinkedList list = (LinkedList)loadBalancedComponentMap.get(key);
+ if (list != null && !list.isEmpty()){
+ result = (ComponentNameSpace) list.removeFirst();
+ list.addLast(result);
+ }
+ return result;
+ }
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java (718 => 719)
--- trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java 2005-11-02 03:23:36 UTC (rev 718)
+++ trunk/core/src/main/java/org/servicemix/jbi/framework/Registry.java 2005-11-02 10:52:43 UTC (rev 719)
@@ -345,6 +345,16 @@
public ComponentConnector getComponentConnector(ComponentNameSpace id) {
return componentRegistry.getComponentConnector(id);
}
+
+
+ /**
+ * For distributed containers, get a ComponentConnector by round-robin
+ * @param id
+ * @return the ComponentConnector or null
+ */
+ public ComponentConnector getLoadBalancedComponentConnector(ComponentNameSpace id){
+ return componentRegistry.getLoadBalancedComponentConnector(id);
+ }
/**
* Add a ComponentConnector to ComponentRegistry Should be called for adding remote ComponentConnectors from other
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/cluster/ClusterFlow.java (718 => 719)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/cluster/ClusterFlow.java 2005-11-02 03:23:36 UTC (rev 718)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/cluster/ClusterFlow.java 2005-11-02 10:52:43 UTC (rev 719)
@@ -284,7 +284,7 @@
*/
public void doRouting(MessageExchangeImpl me) throws MessagingException {
ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
- ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+ ComponentConnector cc = broker.getRegistry().getLoadBalancedComponentConnector(id);
if (cc != null) {
if (cc.isLocal()) {
super.doRouting(me);
Modified: trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java (718 => 719)
--- trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-02 03:23:36 UTC (rev 718)
+++ trunk/core/src/main/java/org/servicemix/jbi/nmr/flow/jms/JMSFlow.java 2005-11-02 10:52:43 UTC (rev 719)
@@ -338,7 +338,7 @@
*/
public void doRouting(MessageExchangeImpl me) throws MessagingException {
ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
- ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+ ComponentConnector cc = broker.getRegistry().getLoadBalancedComponentConnector(id);
if (cc != null) {
if (cc.isLocal()) {
super.doRouting(me);