Author: isilval
Date: Wed Sep 5 14:00:42 2007
New Revision: 573053
URL: http://svn.apache.org/viewvc?rev=573053&view=rev
Log:
Move subscriber and broker id state from invoker to provider
Modified:
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
Modified:
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java?rev=573053&r1=573052&r2=573053&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
(original)
+++
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
Wed Sep 5 14:00:42 2007
@@ -19,16 +19,11 @@
package org.apache.tuscany.sca.binding.notification;
import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import org.apache.axiom.om.OMElement;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import
org.apache.tuscany.sca.binding.notification.NotificationReferenceBindingProvider.SubscriberInfo;
import org.apache.tuscany.sca.binding.notification.encoding.Constants;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
import org.apache.tuscany.sca.binding.notification.util.IOUtils;
import
org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
@@ -47,21 +42,12 @@
private static final Message RESPONSE = new ImmutableMessage();
private Operation operation;
- private List<SubscriberInfo> subscribers;
- private String brokerID;
+ private NotificationReferenceBindingProvider
notificationReferenceBindingProvider;
- public NotificationReferenceBindingInvoker(Operation operation) {
+ public NotificationReferenceBindingInvoker(Operation operation,
+
NotificationReferenceBindingProvider notificationReferenceBindingProvider) {
this.operation = operation;
- this.subscribers = new ArrayList<SubscriberInfo>();
- this.brokerID = null;
- }
-
- public void setBrokerID(String brokerID) {
- this.brokerID = brokerID;
- }
-
- public String getBrokerID() {
- return brokerID;
+ this.notificationReferenceBindingProvider =
notificationReferenceBindingProvider;
}
public Message invoke(Message msg) {
@@ -90,13 +76,14 @@
try {
synchronized(this) {
- for (SubscriberInfo subscriber : subscribers) {
+ for (SubscriberInfo subscriber :
notificationReferenceBindingProvider.getSubscribers()) {
// check for each subscriber's broker id and skip if equal
if (incomingBrokerID != null && subscriber.brokerID !=
null && incomingBrokerID.equals(subscriber.brokerID)) {
continue;
}
HashMap<String, String> headers = new HashMap<String,
String>();
headers.put(IOUtils.Notification_Operation,
operation.getName());
+ String brokerID =
notificationReferenceBindingProvider.getBrokerID();
if (brokerID != null) {
headers.put(Constants.Broker_ID, brokerID);
}
@@ -142,103 +129,5 @@
}
};
return writeable;
- }
-
- public void addSubscriberUrl(URL subscriberUrl) {
- addSubscriber(subscriberUrl, null);
- }
-
- public void addSubscriber(EndpointReference subscriberEPR) {
- BrokerID brokerID = null;
- if (subscriberEPR.getReferenceProperties() != null) {
- brokerID =
subscriberEPR.getReferenceProperties().getProperty(BrokerID.class);
- }
- addSubscriber(subscriberEPR.getEndpointAddress().getAddress(),
(brokerID != null ? brokerID.getID() : null));
- }
-
- public void addSubscriber(URL address, String brokerID) {
- synchronized(this) {
- SubscriberInfo si = new SubscriberInfo(address);
- si.brokerID = brokerID;
- if (subscribers == null) {
- subscribers = new ArrayList<SubscriberInfo>();
- }
- subscribers.add(si);
- }
- }
-
- public void replaceSubscribers(EndpointReference brokerConsumerEPR) {
- synchronized(this) {
- subscribers = null;
- }
- addSubscriber(brokerConsumerEPR);
- }
-
- public void replaceBrokerSubscriber(URL removedBrokerConsumerUrl,
EndpointReference chosenBrokerConsumerEpr) {
- synchronized(this) {
- if (subscribers == null) {
- throw new RuntimeException("No subscribers");
- }
- SubscriberInfo siToRemove = null;
- for (SubscriberInfo si : subscribers) {
- if (si.address.equals(removedBrokerConsumerUrl)) {
- siToRemove = si;
- }
- }
- if (siToRemove == null) {
- throw new RuntimeException("Can't find info for broker to
remove [" + removedBrokerConsumerUrl + "]");
- }
- if (!subscribers.remove(siToRemove)) {
- throw new RuntimeException("Can't remove info for [" +
siToRemove.address + "]");
- }
- }
- if (chosenBrokerConsumerEpr != null) {
- addSubscriber(chosenBrokerConsumerEpr);
- }
- }
-
- public List<EndpointReference> getNeighborBrokerConsumerEprs() {
- synchronized(this) {
- if (subscribers == null) {
- throw new RuntimeException("No subscribers");
- }
- List<EndpointReference> neighborBrokerConsumerEprs = new
ArrayList<EndpointReference>();
- for(SubscriberInfo si : subscribers) {
- if (si.brokerID != null) {
-
neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address,
si.brokerID));
- }
- }
-
- return neighborBrokerConsumerEprs;
- }
- }
-
- public void removeBrokerSubscribers() {
- synchronized(this) {
- if (subscribers == null) {
- throw new RuntimeException("No subscribers");
- }
- List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>();
- for (SubscriberInfo si : subscribers) {
- if (si.brokerID != null) {
- sisToRemove.add(si);
- }
- }
- for(SubscriberInfo si : sisToRemove) {
- if (!subscribers.remove(si)) {
- throw new RuntimeException("Can't remove broker subscriber
[" + si.address + "]");
- }
- }
- }
- }
-
- class SubscriberInfo {
- public URL address;
- public String brokerID;
-
- public SubscriberInfo(URL address) {
- this.address = address;
- this.brokerID = null;
- }
}
}
Modified:
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java?rev=573053&r1=573052&r2=573053&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
(original)
+++
incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
Wed Sep 5 14:00:42 2007
@@ -29,6 +29,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.tuscany.sca.binding.notification.encoding.Broker;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
import org.apache.tuscany.sca.binding.notification.encoding.Constants;
import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject;
@@ -70,6 +71,9 @@
private boolean started;
private NotificationBrokerManager brokerManager;
+ private List<SubscriberInfo> subscribers;
+ private String brokerID;
+
public NotificationReferenceBindingProvider(NotificationBinding
notificationBinding,
RuntimeComponent component,
RuntimeComponentReference
reference,
@@ -107,6 +111,9 @@
for (Operation operation : interfaze.getOperations()) {
operation.setNonBlocking(false);
}
+
+ this.subscribers = new ArrayList<SubscriberInfo>();
+ this.brokerID = null;
}
public NotificationBinding getBinding() {
@@ -120,17 +127,33 @@
public boolean isStarted() {
return started;
}
+
+ public void setBrokerID(String brokerID) {
+ this.brokerID = brokerID;
+ }
+
+ public String getBrokerID() {
+ return brokerID;
+ }
public Invoker createInvoker(Operation operation, boolean isCallback) {
if (isCallback) {
throw new UnsupportedOperationException();
}
+ return createInvoker(operation);
+ }
+
+ public Invoker createInvoker(Operation operation) {
if (invoker == null) {
- invoker = new NotificationReferenceBindingInvoker(operation);
+ invoker = new NotificationReferenceBindingInvoker(operation, this);
}
return invoker;
}
+ public boolean supportsAsyncOneWayInvocation() {
+ return false;
+ }
+
public InterfaceContract getBindingInterfaceContract() {
return reference.getInterfaceContract();
}
@@ -157,13 +180,13 @@
}
if (Constants.EndConsumers.equals(sequenceType)) {
for (URL consumerUrl : consumerList) {
- invoker.addSubscriberUrl(consumerUrl);
+ addSubscriberUrl(consumerUrl);
}
}
else if (Constants.BrokerConsumers.equals(sequenceType)) {
// Pick a broker consumer, for now the first one
URL consumerUrl = consumerList.get(0);
- invoker.addSubscriberUrl(consumerUrl);
+ addSubscriberUrl(consumerUrl);
}
servletHost.addServletMapping(myUrl.toString(), new
NotificationServlet(this));
@@ -171,21 +194,21 @@
public void deployBroker(String brokerID, EndpointReference
brokerConsumerEPR, List<EndpointReference> consumerList) {
if (brokerConsumerEPR != null) {
- invoker.addSubscriber(brokerConsumerEPR);
+ addSubscriber(brokerConsumerEPR);
}
if (consumerList != null && !consumerList.isEmpty()) {
for (EndpointReference consumerEPR : consumerList) {
- invoker.addSubscriber(consumerEPR);
+ addSubscriber(consumerEPR);
}
}
- invoker.setBrokerID(brokerID);
+ setBrokerID(brokerID);
servletHost.addServletMapping(myUrl.toString(), new
NotificationServlet(this));
}
public void undeployBroker(URL brokerConsumerUrl) {
- EndpointReference brokerConsumerEpr =
EncodingUtils.createEndpointReference(brokerConsumerUrl, invoker.getBrokerID());
- ntm.removeBroker(brokerConsumerEpr,
invoker.getNeighborBrokerConsumerEprs(), remoteNtmUrl);
- invoker.removeBrokerSubscribers();
+ EndpointReference brokerConsumerEpr =
EncodingUtils.createEndpointReference(brokerConsumerUrl, getBrokerID());
+ ntm.removeBroker(brokerConsumerEpr, getNeighborBrokerConsumerEprs(),
remoteNtmUrl);
+ removeBrokerSubscribers();
}
public void handle(Map<String, String> headers, ServletInputStream
istream, int contentLength, ServletOutputStream ostream) {
@@ -194,11 +217,11 @@
EncodingObject eo =
EncodingUtils.decodeFromStream(encodingRegistry, istream);
if (eo instanceof Subscribe) {
Subscribe sub = (Subscribe)eo;
-
invoker.addSubscriber(sub.getConsumerReference().getReference());
+ addSubscriber(sub.getConsumerReference().getReference());
}
else if (eo instanceof ConnectionOverride) {
ConnectionOverride co = (ConnectionOverride)eo;
-
invoker.replaceSubscribers(co.getBrokerConsumerReference().getReference());
+
replaceSubscribers(co.getBrokerConsumerReference().getReference());
}
else if (eo instanceof ReplaceBrokerConnection) {
ReplaceBrokerConnection rbc = (ReplaceBrokerConnection)eo;
@@ -206,13 +229,13 @@
if (rbc.getNeighbors() != null) {
int choice = rbc.getNeighbors().getBrokerSequence().size()
- 1;
Broker chosenBroker =
rbc.getNeighbors().getBrokerSequence().get(choice);
- invoker.replaceBrokerSubscriber(removedBrokerConsumerEpr,
+ replaceBrokerSubscriber(removedBrokerConsumerEpr,
chosenBroker.getBrokerConsumerReference().getReference());
brokerManager.replaceConsumersBrokerConnection(notificationType,
chosenBroker.getBrokerProducerReference().getReference());
}
else {
- invoker.replaceBrokerSubscriber(removedBrokerConsumerEpr,
null);
+ replaceBrokerSubscriber(removedBrokerConsumerEpr, null);
}
}
else {
@@ -221,6 +244,108 @@
} catch(Throwable e) {
e.printStackTrace();
throw new RuntimeException(e);
+ }
+ }
+
+ public List<SubscriberInfo> getSubscribers() {
+ return subscribers;
+ }
+
+ private void addSubscriberUrl(URL subscriberUrl) {
+ addSubscriber(subscriberUrl, null);
+ }
+
+ private void addSubscriber(EndpointReference subscriberEPR) {
+ BrokerID brokerID = null;
+ if (subscriberEPR.getReferenceProperties() != null) {
+ brokerID =
subscriberEPR.getReferenceProperties().getProperty(BrokerID.class);
+ }
+ addSubscriber(subscriberEPR.getEndpointAddress().getAddress(),
(brokerID != null ? brokerID.getID() : null));
+ }
+
+ private void addSubscriber(URL address, String brokerID) {
+ synchronized(this) {
+ SubscriberInfo si = new SubscriberInfo(address);
+ si.brokerID = brokerID;
+ if (subscribers == null) {
+ subscribers = new ArrayList<SubscriberInfo>();
+ }
+ subscribers.add(si);
+ }
+ }
+
+ private void replaceSubscribers(EndpointReference brokerConsumerEPR) {
+ synchronized(this) {
+ subscribers = null;
+ }
+ addSubscriber(brokerConsumerEPR);
+ }
+
+ private void replaceBrokerSubscriber(URL removedBrokerConsumerUrl,
EndpointReference chosenBrokerConsumerEpr) {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ SubscriberInfo siToRemove = null;
+ for (SubscriberInfo si : subscribers) {
+ if (si.address.equals(removedBrokerConsumerUrl)) {
+ siToRemove = si;
+ }
+ }
+ if (siToRemove == null) {
+ throw new RuntimeException("Can't find info for broker to
remove [" + removedBrokerConsumerUrl + "]");
+ }
+ if (!subscribers.remove(siToRemove)) {
+ throw new RuntimeException("Can't remove info for [" +
siToRemove.address + "]");
+ }
+ }
+ if (chosenBrokerConsumerEpr != null) {
+ addSubscriber(chosenBrokerConsumerEpr);
+ }
+ }
+
+ private List<EndpointReference> getNeighborBrokerConsumerEprs() {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ List<EndpointReference> neighborBrokerConsumerEprs = new
ArrayList<EndpointReference>();
+ for(SubscriberInfo si : subscribers) {
+ if (si.brokerID != null) {
+
neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address,
si.brokerID));
+ }
+ }
+
+ return neighborBrokerConsumerEprs;
+ }
+ }
+
+ private void removeBrokerSubscribers() {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>();
+ for (SubscriberInfo si : subscribers) {
+ if (si.brokerID != null) {
+ sisToRemove.add(si);
+ }
+ }
+ for(SubscriberInfo si : sisToRemove) {
+ if (!subscribers.remove(si)) {
+ throw new RuntimeException("Can't remove broker subscriber
[" + si.address + "]");
+ }
+ }
+ }
+ }
+
+ class SubscriberInfo {
+ public URL address;
+ public String brokerID;
+
+ public SubscriberInfo(URL address) {
+ this.address = address;
+ this.brokerID = null;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]