| Commit in servicemix/base/src/main/java/org/servicemix/jbi on MAIN | |||
| nmr/SubscriptionManager.java | +1 | 1.2 -> 1.3 | |
| framework/SubscriptionRegistry.java | +6 | -5 | 1.1 -> 1.2 |
| /Registry.java | +2 | -1 | 1.6 -> 1.7 |
| container/SubscriptionSpec.java | +38 | -11 | 1.3 -> 1.4 |
| +47 | -17 | ||
implementation of subscription semantics (SM-35) finished
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.2 -r1.3 --- SubscriptionManager.java 22 Sep 2005 12:22:19 -0000 1.2 +++ SubscriptionManager.java 22 Sep 2005 18:17:58 -0000 1.3 @@ -80,6 +80,7 @@
protected void dispatchToSubscriber(MessageExchangeImpl exchange, ServiceEndpointImpl endpoint) throws JBIException {
ExchangePacket packet = exchange.getPacket().copy();
packet.setEndpoint(endpoint);
+ packet.setDestinationId(endpoint.getComponentNameSpace());
flow.send(packet);
}
servicemix/base/src/main/java/org/servicemix/jbi/framework
diff -u -r1.1 -r1.2 --- SubscriptionRegistry.java 22 Sep 2005 12:22:19 -0000 1.1 +++ SubscriptionRegistry.java 22 Sep 2005 18:17:58 -0000 1.2 @@ -33,20 +33,22 @@
* Maintains a registry of the applicable subscriptions currently active for the * current components *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SubscriptionRegistry {
private Map subscriptions = new ConcurrentHashMap();
-
+ private Registry registry;
+ public void init(Registry registry) {
+ this.registry = registry;
+ }
/**
* @param subscription
* @param endpoint
*/
public void registerSubscription(SubscriptionSpec subscription, ServiceEndpointImpl endpoint) {
- System.out.println("REGISTER SUB: " + subscription);
subscriptions.put(subscription, endpoint);
}
@@ -69,8 +71,7 @@
Map.Entry entry = (Map.Entry) iter.next();
SubscriptionSpec subscription = (SubscriptionSpec) entry.getKey();
-System.out.println("TESTING SUB -" + subscription + " MATCH IS TRUE = " + subscription.matches(exchange));
- if (subscription.matches(exchange)) {
+ if (subscription.matches(registry,exchange)) {
if (result == null) {
result = new ArrayList();
}
servicemix/base/src/main/java/org/servicemix/jbi/framework
diff -u -r1.6 -r1.7 --- Registry.java 22 Sep 2005 12:22:19 -0000 1.6 +++ Registry.java 22 Sep 2005 18:17:58 -0000 1.7 @@ -47,7 +47,7 @@
/** * Registry - state infomation including running state, SA's deployed etc. *
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class Registry extends BaseLifeCycle {
private final static Log log = LogFactory.getLog(Registry.class);
@@ -85,6 +85,7 @@
public void init(JBIContainer container) throws JBIException {
this.container = container;
this.componentRegistry.setContainerName(container.getName());
+ this.subscriptionRegistry.init(this);
}
/**
servicemix/base/src/main/java/org/servicemix/jbi/container
diff -u -r1.3 -r1.4 --- SubscriptionSpec.java 22 Sep 2005 12:22:19 -0000 1.3 +++ SubscriptionSpec.java 22 Sep 2005 18:17:58 -0000 1.4 @@ -17,13 +17,17 @@
**/ package org.servicemix.jbi.container;
+import org.servicemix.jbi.framework.ComponentNameSpace; +import org.servicemix.jbi.framework.Registry;
import org.servicemix.jbi.messaging.ExchangePacket; import org.servicemix.jbi.messaging.MessageExchangeImpl; import org.servicemix.jbi.resolver.EndpointFilter; import org.servicemix.jbi.resolver.SubscriptionFilter;
+import org.servicemix.jbi.servicedesc.ServiceEndpointImpl;
import java.io.Serializable;
+import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName; /**
@@ -89,21 +93,44 @@
* @param exchange the exchange to be matched
* @return true if this subscription matches the exchange
*/
- public boolean matches(MessageExchangeImpl exchange) {
+ public boolean matches(Registry registry,MessageExchangeImpl exchange){
boolean result = false;
-
- if (filter != null) {
+
+ if(filter != null){
result = filter.matches(exchange);
- }else {
+ }else{
ExchangePacket packet = exchange.getPacket();
- QName exchangeServiceName = packet.getServiceName();
- QName exchangeInterfaceName = packet.getInterfaceName();
- if (service != null && exchangeServiceName != null) {
- result = service.equals(exchangeServiceName);
- }
- if (interfaceName != null && exchangeInterfaceName != null){
- result &= interfaceName.equals(exchangeInterfaceName);
+ ComponentNameSpace sourceId = packet.getSourceId();
+ if(sourceId != null){
+ // get the list of services
+ if(service != null){
+ ServiceEndpoint[] ses = registry.getEndpointsForService(service);
+ if(ses != null){
+ for(int i = 0;i < ses.length;i++){
+ ServiceEndpointImpl se = (ServiceEndpointImpl) ses[i];
+ if(se.getComponentNameSpace() != null && se.getComponentNameSpace().equals(sourceId)){
+ result = true;
+ break;
+ }
+ }
+ }
+ }
+ if(result && interfaceName != null){
+ ServiceEndpoint[] ses = registry.getEndpoints(interfaceName);
+ if(ses != null){
+ result = false;
+ for(int i = 0;i < ses.length;i++){
+ ServiceEndpointImpl se = (ServiceEndpointImpl) ses[i];
+ if(se.getComponentNameSpace() != null && se.getComponentNameSpace().equals(sourceId)){
+ result = true;
+ break;
+ }
+ }
+ }
+ }
+
}
+
}
return result;
}
