| Commit in servicemix/base on MAIN | |||
| src/test/resources/org/servicemix/components/jms/example-endpoint-subscription.xml | +82 | added 1.1 | |
| project.xml | +3 | 1.85 -> 1.86 | |
| src/main/java/org/servicemix/jbi/nmr/Broker.java | +88 | -67 | 1.20 -> 1.21 |
| src/test/java/org/servicemix/components/jms/JmsBindingUsingEndpointSubscriptionTest.java | +40 | added 1.1 | |
| +213 | -67 | ||
added test case for component subscription model, which is currently excluded due to it not being implemented yet :)
servicemix/base/src/test/resources/org/servicemix/components/jms
example-endpoint-subscription.xml added at 1.1
diff -N example-endpoint-subscription.xml --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ example-endpoint-subscription.xml 20 Sep 2005 17:38:15 -0000 1.1 @@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?> +<beans xmlns:foo="http://servicemix.org/cheese/"> + + <!-- the JBI container --> + <container id="jbi"> + <components> + + <component id="jmsSender" service="foo:jmsSender" + class="org.servicemix.components.jms.JmsSenderComponent"> + <property name="template"> + <bean class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory"> + <ref local="jmsFactory" /> + </property> + <property name="defaultDestinationName" + value="test.org.servicemix.example.1" /> + <property name="pubSubDomain" value="true" /> + </bean> + </property> + </component> + + <!-- START SNIPPET: publish --> + <component id="jmsReceiver" service="foo:jmsReceiver" + failIfNoDestinationEndpoint="false" + class="org.servicemix.components.jms.JmsReceiverComponent"> + <!-- END SNIPPET: publish --> + <property name="template"> + <bean class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory"> + <ref local="jmsFactory" /> + </property> + <property name="defaultDestinationName" + value="test.org.servicemix.example.1" /> + <property name="pubSubDomain" value="true" /> + </bean> + </property> + </component> + + + <!-- START SNIPPET: subscribe --> + <component id="receiver" service="foo:receiver" + class="org.servicemix.examples.ReceiverComponent"> + + <!-- lets subscribe to the output of the foo:jmsReceiver component --> + <subscribe service="foo:jmsReceiver" /> + </component> + <!-- END SNIPPET: subscribe --> + + + <component id="trace" service="foo:trace" + class="org.servicemix.components.groovy.GroovyComponent"> + <subscribe service="foo:jmsReceiver" /> + + <property name="disableOutput" value="true" /> + <property name="scriptText"> + <value> + <![CDATA[ +log.info "Received message $inMessage with body: $inMessage.bodyText" + ]]> + </value> + </property> + </component> + </components> + </container> + + <bean id="client" + class="org.servicemix.client.DefaultServiceMixClient"> + <constructor-arg ref="jbi" /> + </bean> + + <bean id="jmsFactory" + class="org.activemq.pool.PooledConnectionFactory"> + <property name="connectionFactory"> + <bean class="org.activemq.ActiveMQConnectionFactory"> + <property name="brokerURL"> + <value>vm://localhost</value> + </property> + </bean> + </property> + </bean> + +</beans>
servicemix/base
diff -u -r1.85 -r1.86 --- project.xml 20 Sep 2005 11:00:30 -0000 1.85 +++ project.xml 20 Sep 2005 17:38:15 -0000 1.86 @@ -1003,6 +1003,9 @@
<exclude>**/ScriptTest.*</exclude>
<exclude>**/ReflectionBindingTest.*</exclude>
+ <!-- TODO temporary disabled until subscriptions are enabled --> + <exclude>**/JmsBindingUsingEndpointSubscriptionTest.*</exclude> +
<!-- need a HTT test server... -->
<exclude>**/HttpTest.*</exclude>
servicemix/base/src/main/java/org/servicemix/jbi/nmr
diff -u -r1.20 -r1.21 --- Broker.java 20 Sep 2005 16:52:24 -0000 1.20 +++ Broker.java 20 Sep 2005 17:38:15 -0000 1.21 @@ -54,10 +54,10 @@
/** * The Broker handles Nomalised Message Routing within ServiceMix
- * - * @version $Revision: 1.20 $
+ * + * @version $Revision: 1.21 $
*/
-public class Broker extends BaseLifeCycle {
+public class Broker extends BaseLifeCycle {
private JBIContainer container;
private Registry registry;
private String flowName = "seda";
@@ -73,15 +73,16 @@
*/
public Broker() {
}
-
+
/**
* Get the description
+ *
* @return description
*/
- public String getDescription(){
+ public String getDescription() {
return "Normalized Message Router";
}
-
+
/**
* @return Returns the workManager.
*/
@@ -90,11 +91,12 @@
}
/**
- * @param workManager The workManager to set.
+ * @param workManager + * The workManager to set.
*/
public void setWorkManager(WorkManager workManager) {
this.workManager = workManager;
- }
+ }
public SubscriptionManager getSubscriptionManager() {
return subscriptionManager;
@@ -109,6 +111,7 @@
/**
* initialize the broker
+ *
* @param container
* @throws JBIException
*/
@@ -122,103 +125,113 @@
this.flow.init(this);
getSubscriptionManager().init(this);
}
-
+
/**
* Get the name of the Container
+ *
* @return containerName
*/
- public String getContainerName(){
+ public String getContainerName() {
return container.getName();
}
-
+
/**
* Get the ManagementContext
+ *
* @return the managementContext
*/
- public ManagementContext getManagementContext(){
+ public ManagementContext getManagementContext() {
return container.getManagementContext();
}
-
+
/**
* Get the Registry
+ *
* @return the registry
*/
- public Registry getRegistry(){
+ public Registry getRegistry() {
return registry;
}
+
/**
* start brokering
+ *
* @throws JBIException
*/
- public void start() throws JBIException{
+ public void start() throws JBIException {
flow.start();
super.start();
}
-
+
/**
* stop brokering
+ *
* @throws JBIException
*/
- public void stop() throws JBIException{
+ public void stop() throws JBIException {
flow.stop();
super.stop();
}
-
+
/**
* shutdown all Components
+ *
* @throws JBIException
*/
- public void shutDown() throws JBIException{
+ public void shutDown() throws JBIException {
stop();
flow.shutDown();
super.shutDown();
}
-
+
/**
* @return Returns the flow.
*/
public String getFlowName() {
return flowName;
}
+
/**
- * @param flowName The flow to set.
+ * @param flowName + * The flow to set.
*/
public void setFlowName(String flowName) {
this.flowName = flowName;
}
-
+
/**
* Set the flow
+ *
* @param flow
*/
- public void setFlow(Flow flow){
+ public void setFlow(Flow flow) {
this.flow = flow;
}
-
+
/**
* @return the Flow
*/
- public Flow getFlow(){
+ public Flow getFlow() {
return this.flow;
}
-
+
/**
* suspend the flow to prevent any message exchanges
*/
- public void suspend(){
- flow.suspend();
+ public void suspend() {
+ flow.suspend();
}
-
+
/**
* resume message exchange processing
*/
- public void resume(){
+ public void resume() {
flow.resume();
}
/**
* Route an ExchangePacket to a destination
- *
+ *
* @param exchange
* @throws JBIException
*/
@@ -234,24 +247,25 @@
ComponentContextImpl context = exchange.getSourceContext();
boolean remoteDestination = false;
if (theEndpoint == null) {
- //check in order, ServiceName then InterfaceName - //if remote, send on for remote broker to do the routing - //check to see if there is a match on the serviceName
+ // check in order, ServiceName then InterfaceName + // if remote, send on for remote broker to do the routing + // check to see if there is a match on the serviceName
if (serviceName != null) {
ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName);
- //check against name (if set) - //theEndpoint = matchEndpointByName(endpoints,endpointName);
+ // check against name (if set) + // theEndpoint = + // matchEndpointByName(endpoints,endpointName);
theEndpoint = (ServiceEndpointImpl) getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
- // TODO should we match against operationName / interfaceName?
+ // TODO should we match against operationName / + // interfaceName?
if (theEndpoint == null) {
- //check to see if the target component is remote
+ // check to see if the target component is remote
ComponentConnector consumerConnector = registry.getComponentConnectorByExternalServiceName(serviceName);
if (consumerConnector != null) {
if (consumerConnector instanceof LocalComponentConnector) {
- theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet - .getSourceId(), exchange, endpoints);
+ theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet.getSourceId(), exchange, endpoints);
}
else {
remoteDestination = true;
@@ -259,23 +273,22 @@
}
}
else {
- log.warn("SericeName (" + serviceName
- + ") specified for routing, but can't find it registered");
+ log.warn("SericeName (" + serviceName + ") specified for routing, but can't find it registered");
}
}
}
if (theEndpoint == null && !remoteDestination && interfaceName != null) {
ServiceEndpoint[] endpoints = registry.getEndpoints(interfaceName);
- //check against name (if set) - //theEndpoint = matchEndpointByName(endpoints,endpointName);
+ // check against name (if set) + // theEndpoint = + // matchEndpointByName(endpoints,endpointName);
theEndpoint = (ServiceEndpointImpl) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
if (theEndpoint == null) {
- //check to see if the target component is remote
+ // check to see if the target component is remote
ComponentConnector consumerConnector = registry.getComponentConnector(interfaceName);
if (consumerConnector != null) {
if (consumerConnector instanceof LocalComponentConnector) {
- theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet - .getSourceId(), exchange, endpoints);
+ theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet.getSourceId(), exchange, endpoints);
}
else {
remoteDestination = true;
@@ -283,13 +296,13 @@
}
}
else {
- log.warn("SericeName (" + serviceName
- + ") specified for routing, but can't find it registered");
+ log.warn("SericeName (" + serviceName + ") specified for routing, but can't find it registered");
}
}
}
if (theEndpoint == null) {
- // lets use the resolver on the activation spec if applicable
+ // lets use the resolver on the activation spec if + // applicable
ActivationSpec activationSpec = exchange.getActivationSpec();
if (activationSpec != null) {
@@ -320,17 +333,19 @@
throwException = activationSpec.isFailIfNoDestinationEndpoint();
}
if (throwException) {
- throw new MessagingException("Could not find route for exchange: " + exchange
- + " for service: " + serviceName + " and interface: " + interfaceName);
+ throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + serviceName + " and interface: "
+ + interfaceName);
}
}
- if (log.isTraceEnabled()){
- log.trace("routing exchange " + packet + " TO: " + theEndpoint);
- }
+ if (log.isTraceEnabled()) {
+ log.trace("routing exchange " + packet + " TO: " + theEndpoint);
+ }
+ }
+
+ if (packet.getEndpoint() != null) {
+ flow.send(packet);
}
- - flow.send(packet); -
+
getSubscriptionManager().dispatchToSubscribers(exchange);
}
@@ -343,6 +358,7 @@
/**
* Set the default EndpointChooser
+ *
* @param defaultInterfaceChooser
*/
public void setDefaultInterfaceChooser(EndpointChooser defaultInterfaceChooser) {
@@ -358,6 +374,7 @@
/**
* Set default EndpointChooser
+ *
* @param defaultServiceChooser
*/
public void setDefaultServiceChooser(EndpointChooser defaultServiceChooser) {
@@ -365,8 +382,10 @@
}
/**
- * Returns the endpoint chooser for endpoints found by service which will use the chooser on the exchange's - * activation spec if available otherwise will use the default
+ * Returns the endpoint chooser for endpoints found by service which will + * use the chooser on the exchange's activation spec if available otherwise + * will use the default + *
* @param exchange
* @return the EndpointChooser
*/
@@ -382,10 +401,11 @@
return chooser;
}
-
/**
- * Returns the endpoint chooser for endpoints found by service which will use the chooser on the exchange's - * activation spec if available otherwise will use the default
+ * Returns the endpoint chooser for endpoints found by service which will + * use the chooser on the exchange's activation spec if available otherwise + * will use the default + *
* @param exchange
* @return the EndpointChooser
*/
@@ -402,7 +422,9 @@
}
/**
- * Factory method to create an endpoint filter for the given component context and message exchange
+ * Factory method to create an endpoint filter for the given component + * context and message exchange + *
* @param context
* @param exchange
* @return the EndpointFilter
@@ -417,7 +439,7 @@
return new ProducerComponentEndpointFilter(component);
}
}
-
+
/**
* Get an array of MBeanOperationInfo
*
@@ -428,9 +450,8 @@
OperationInfoHelper helper = new OperationInfoHelper();
helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
-
+
return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
}
-
}
servicemix/base/src/test/java/org/servicemix/components/jms
JmsBindingUsingEndpointSubscriptionTest.java added at 1.1
diff -N JmsBindingUsingEndpointSubscriptionTest.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ JmsBindingUsingEndpointSubscriptionTest.java 20 Sep 2005 17:38:15 -0000 1.1 @@ -0,0 +1,40 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.components.jms;
+
+import org.servicemix.TestSupport;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Revision$
+ */
+public class JmsBindingUsingEndpointSubscriptionTest extends TestSupport {
+
+ public void testSendMessagesToJmsThenOutofJmsToReceiver() throws Exception {
+ QName service = new QName("http://servicemix.org/cheese/", "jmsSender");
+
+ assertSendAndReceiveMessages(service);
+ }
+
+ protected AbstractXmlApplicationContext createBeanFactory() {
+ return new ClassPathXmlApplicationContext("org/servicemix/components/jms/example-endpoint-subscription.xml");
+ }
+}
