| Commit in servicemix/base/src/main/java/org/servicemix/jbi/nmr on MAIN | |||
| Broker.java | +40 | -2 | 1.25 -> 1.26 |
Fix SM-81 and a bug which caused all routing fails
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.25 -r1.26 --- Broker.java 6 Oct 2005 16:35:33 -0000 1.25 +++ Broker.java 7 Oct 2005 08:26:36 -0000 1.26 @@ -19,8 +19,12 @@
package org.servicemix.jbi.nmr;
+import java.util.ArrayList; +import java.util.List; +
import javax.jbi.JBIException; import javax.jbi.component.Component;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException; import javax.jbi.servicedesc.ServiceEndpoint; import javax.management.JMException;
@@ -54,7 +58,7 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix *
- * @version $Revision: 1.25 $
+ * @version $Revision: 1.26 $
*/
public class Broker extends BaseLifeCycle {
private JBIContainer container;
@@ -280,6 +284,7 @@
// check to see if there is a match on the serviceName
if (serviceName != null) {
ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName);
+ endpoints = filterEndpoints(endpoints, exchange);
// check against name (if set)
// theEndpoint =
// matchEndpointByName(endpoints,endpointName);
@@ -307,6 +312,7 @@
}
if (theEndpoint == null && !remoteDestination && interfaceName != null) {
ServiceEndpoint[] endpoints = registry.getEndpoints(interfaceName);
+ endpoints = filterEndpoints(endpoints, exchange);
// check against name (if set)
// theEndpoint =
// matchEndpointByName(endpoints,endpointName);
@@ -366,7 +372,7 @@
}
if (packet.isOutbound()) {
- foundRoute = getSubscriptionManager().dispatchToSubscribers(exchange);
+ foundRoute |= getSubscriptionManager().dispatchToSubscribers(exchange);
}
if (!foundRoute){
@@ -383,6 +389,38 @@
}
/**
+ * Filter the given endpoints by asking to the components
+ * if they are ok to process the exchange.
+ *
+ * @param endpoints
+ * @param exchange
+ * @return
+ */
+ protected ServiceEndpoint[] filterEndpoints(ServiceEndpoint[] endpoints, MessageExchangeImpl exchange) {
+ List filtered = new ArrayList();
+ for (int i = 0; i < endpoints.length; i++) {
+ ComponentNameSpace id = ((ServiceEndpointImpl) endpoints[i]).getComponentNameSpace();
+ LocalComponentConnector lcc = getRegistry().getLocalComponentConnector(id);
+ // If this is a local component
+ if (lcc != null) {
+ if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+ if (lcc.getComponent().isExchangeWithConsumerOkay(endpoints[i], exchange)) {
+ filtered.add(endpoints[i]);
+ }
+ } else {
+ if (lcc.getComponent().isExchangeWithProviderOkay(endpoints[i], exchange)) {
+ filtered.add(endpoints[i]);
+ }
+ }
+ // If the component is not local, we can not filter
+ } else {
+ filtered.add(endpoints[i]);
+ }
+ }
+ return (ServiceEndpoint[]) filtered.toArray(new ServiceEndpoint[filtered.size()]);
+ }
+
+ /**
* @return the default EndpointChooser
*/
public EndpointChooser getDefaultInterfaceChooser() {
