Commit in servicemix/base on MAIN
src/test/resources/org/servicemix/components/jms/example-endpoint-subscription.xml+82added 1.1
project.xml+31.85 -> 1.86
src/main/java/org/servicemix/jbi/nmr/Broker.java+88-671.20 -> 1.21
src/test/java/org/servicemix/components/jms/JmsBindingUsingEndpointSubscriptionTest.java+40added 1.1
+213-67
2 added + 2 modified, total 4 files
added test case for component subscription model, which is currently excluded due to it not being implemented yet :)

servicemix/base/src/test/resources/org/servicemix/components/jms
example-endpoint-subscription.xml added at 1.1
diff -N example-endpoint-subscription.xml
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ example-endpoint-subscription.xml	20 Sep 2005 17:38:15 -0000	1.1
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns:foo="http://servicemix.org/cheese/">
+
+  <!-- the JBI container -->
+  <container id="jbi">
+    <components>
+
+      <component id="jmsSender" service="foo:jmsSender"
+        class="org.servicemix.components.jms.JmsSenderComponent">
+        <property name="template">
+          <bean class="org.springframework.jms.core.JmsTemplate">
+            <property name="connectionFactory">
+              <ref local="jmsFactory" />
+            </property>
+            <property name="defaultDestinationName"
+              value="test.org.servicemix.example.1" />
+            <property name="pubSubDomain" value="true" />
+          </bean>
+        </property>
+      </component>
+
+      <!-- START SNIPPET: publish -->
+      <component id="jmsReceiver" service="foo:jmsReceiver"
+        failIfNoDestinationEndpoint="false"
+        class="org.servicemix.components.jms.JmsReceiverComponent">
+        <!-- END SNIPPET: publish -->
+        <property name="template">
+          <bean class="org.springframework.jms.core.JmsTemplate">
+            <property name="connectionFactory">
+              <ref local="jmsFactory" />
+            </property>
+            <property name="defaultDestinationName"
+              value="test.org.servicemix.example.1" />
+            <property name="pubSubDomain" value="true" />
+          </bean>
+        </property>
+      </component>
+
+
+      <!-- START SNIPPET: subscribe -->
+      <component id="receiver" service="foo:receiver"
+        class="org.servicemix.examples.ReceiverComponent">
+
+        <!--  lets subscribe to the output of the foo:jmsReceiver component -->
+        <subscribe service="foo:jmsReceiver" />
+      </component>
+      <!-- END SNIPPET: subscribe -->
+
+
+      <component id="trace" service="foo:trace"
+        class="org.servicemix.components.groovy.GroovyComponent">
+        <subscribe service="foo:jmsReceiver" />
+
+        <property name="disableOutput" value="true" />
+        <property name="scriptText">
+          <value>
+            <![CDATA[
+log.info "Received message $inMessage with body: $inMessage.bodyText"
+            ]]>
+          </value>
+        </property>
+      </component>
+    </components>
+  </container>
+
+  <bean id="client"
+    class="org.servicemix.client.DefaultServiceMixClient">
+    <constructor-arg ref="jbi" />
+  </bean>
+
+  <bean id="jmsFactory"
+    class="org.activemq.pool.PooledConnectionFactory">
+    <property name="connectionFactory">
+      <bean class="org.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL">
+          <value>vm://localhost</value>
+        </property>
+      </bean>
+    </property>
+  </bean>
+
+</beans>

servicemix/base
project.xml 1.85 -> 1.86
diff -u -r1.85 -r1.86
--- project.xml	20 Sep 2005 11:00:30 -0000	1.85
+++ project.xml	20 Sep 2005 17:38:15 -0000	1.86
@@ -1003,6 +1003,9 @@
         <exclude>**/ScriptTest.*</exclude>
         <exclude>**/ReflectionBindingTest.*</exclude>
 
+        <!-- TODO temporary disabled until subscriptions are enabled -->
+        <exclude>**/JmsBindingUsingEndpointSubscriptionTest.*</exclude>
+
         <!-- need a HTT test server... -->
         <exclude>**/HttpTest.*</exclude>
 

servicemix/base/src/main/java/org/servicemix/jbi/nmr
Broker.java 1.20 -> 1.21
diff -u -r1.20 -r1.21
--- Broker.java	20 Sep 2005 16:52:24 -0000	1.20
+++ Broker.java	20 Sep 2005 17:38:15 -0000	1.21
@@ -54,10 +54,10 @@
 
 /**
  * The Broker handles Nomalised Message Routing within ServiceMix
- *
- * @version $Revision: 1.20 $
+ * 
+ * @version $Revision: 1.21 $
  */
-public class Broker  extends BaseLifeCycle {
+public class Broker extends BaseLifeCycle {
     private JBIContainer container;
     private Registry registry;
     private String flowName = "seda";
@@ -73,15 +73,16 @@
      */
     public Broker() {
     }
-    
+
     /**
      * Get the description
+     * 
      * @return description
      */
-    public String getDescription(){
+    public String getDescription() {
         return "Normalized Message Router";
     }
-    
+
     /**
      * @return Returns the workManager.
      */
@@ -90,11 +91,12 @@
     }
 
     /**
-     * @param workManager The workManager to set.
+     * @param workManager
+     *            The workManager to set.
      */
     public void setWorkManager(WorkManager workManager) {
         this.workManager = workManager;
-    }    
+    }
 
     public SubscriptionManager getSubscriptionManager() {
         return subscriptionManager;
@@ -109,6 +111,7 @@
 
     /**
      * initialize the broker
+     * 
      * @param container
      * @throws JBIException
      */
@@ -122,103 +125,113 @@
         this.flow.init(this);
         getSubscriptionManager().init(this);
     }
-    
+
     /**
      * Get the name of the Container
+     * 
      * @return containerName
      */
-    public String getContainerName(){
+    public String getContainerName() {
         return container.getName();
     }
-    
+
     /**
      * Get the ManagementContext
+     * 
      * @return the managementContext
      */
-    public ManagementContext getManagementContext(){
+    public ManagementContext getManagementContext() {
         return container.getManagementContext();
     }
-    
+
     /**
      * Get the Registry
+     * 
      * @return the registry
      */
-    public Registry getRegistry(){
+    public Registry getRegistry() {
         return registry;
     }
+
     /**
      * start brokering
+     * 
      * @throws JBIException
      */
-    public void start() throws JBIException{
+    public void start() throws JBIException {
         flow.start();
         super.start();
     }
-    
+
     /**
      * stop brokering
+     * 
      * @throws JBIException
      */
-    public void stop() throws JBIException{
+    public void stop() throws JBIException {
         flow.stop();
         super.stop();
     }
-    
+
     /**
      * shutdown all Components
+     * 
      * @throws JBIException
      */
-    public void shutDown() throws JBIException{
+    public void shutDown() throws JBIException {
         stop();
         flow.shutDown();
         super.shutDown();
     }
-    
+
     /**
      * @return Returns the flow.
      */
     public String getFlowName() {
         return flowName;
     }
+
     /**
-     * @param flowName The flow to set.
+     * @param flowName
+     *            The flow to set.
      */
     public void setFlowName(String flowName) {
         this.flowName = flowName;
     }
-    
+
     /**
      * Set the flow
+     * 
      * @param flow
      */
-    public void setFlow(Flow flow){
+    public void setFlow(Flow flow) {
         this.flow = flow;
     }
-    
+
     /**
      * @return the Flow
      */
-    public Flow getFlow(){
+    public Flow getFlow() {
         return this.flow;
     }
-    
+
     /**
      * suspend the flow to prevent any message exchanges
      */
-    public void suspend(){
-       flow.suspend();
+    public void suspend() {
+        flow.suspend();
     }
-     
+
     /**
      * resume message exchange processing
      */
-    public void resume(){
+    public void resume() {
         flow.resume();
     }
 
     /**
      * Route an ExchangePacket to a destination
-     *
+     * 
      * @param exchange
      * @throws JBIException
      */
@@ -234,24 +247,25 @@
             ComponentContextImpl context = exchange.getSourceContext();
             boolean remoteDestination = false;
             if (theEndpoint == null) {
-                //check in order, ServiceName then InterfaceName
-                //if remote, send on for remote broker to do the routing
-                //check to see if there is a match on the serviceName
+                // check in order, ServiceName then InterfaceName
+                // if remote, send on for remote broker to do the routing
+                // check to see if there is a match on the serviceName
                 if (serviceName != null) {
                     ServiceEndpoint[] endpoints = registry.getEndpointsForService(serviceName);
-                    //check against name (if set)
-                    //theEndpoint = matchEndpointByName(endpoints,endpointName);
+                    // check against name (if set)
+                    // theEndpoint =
+                    // matchEndpointByName(endpoints,endpointName);
                     theEndpoint = (ServiceEndpointImpl) getServiceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
 
-                    // TODO should we match against operationName / interfaceName?
+                    // TODO should we match against operationName /
+                    // interfaceName?
 
                     if (theEndpoint == null) {
-                        //check to see if the target component is remote
+                        // check to see if the target component is remote
                         ComponentConnector consumerConnector = registry.getComponentConnectorByExternalServiceName(serviceName);
                         if (consumerConnector != null) {
                             if (consumerConnector instanceof LocalComponentConnector) {
-                                theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet
-                                        .getSourceId(), exchange, endpoints);
+                                theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet.getSourceId(), exchange, endpoints);
                             }
                             else {
                                 remoteDestination = true;
@@ -259,23 +273,22 @@
                             }
                         }
                         else {
-                            log.warn("SericeName (" + serviceName
-                                    + ") specified for routing, but can't find it registered");
+                            log.warn("SericeName (" + serviceName + ") specified for routing, but can't find it registered");
                         }
                     }
                 }
                 if (theEndpoint == null && !remoteDestination && interfaceName != null) {
                     ServiceEndpoint[] endpoints = registry.getEndpoints(interfaceName);
-                    //check against name (if set)
-                    //theEndpoint = matchEndpointByName(endpoints,endpointName);
+                    // check against name (if set)
+                    // theEndpoint =
+                    // matchEndpointByName(endpoints,endpointName);
                     theEndpoint = (ServiceEndpointImpl) getInterfaceChooser(exchange).chooseEndpoint(endpoints, context, exchange);
                     if (theEndpoint == null) {
-                        //check to see if the target component is remote
+                        // check to see if the target component is remote
                         ComponentConnector consumerConnector = registry.getComponentConnector(interfaceName);
                         if (consumerConnector != null) {
                             if (consumerConnector instanceof LocalComponentConnector) {
-                                theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet
-                                        .getSourceId(), exchange, endpoints);
+                                theEndpoint = registry.getMatch((LocalComponentConnector) consumerConnector, packet.getSourceId(), exchange, endpoints);
                             }
                             else {
                                 remoteDestination = true;
@@ -283,13 +296,13 @@
                             }
                         }
                         else {
-                            log.warn("SericeName (" + serviceName
-                                    + ") specified for routing, but can't find it registered");
+                            log.warn("SericeName (" + serviceName + ") specified for routing, but can't find it registered");
                         }
                     }
                 }
                 if (theEndpoint == null) {
-                    // lets use the resolver on the activation spec if applicable
+                    // lets use the resolver on the activation spec if
+                    // applicable
 
                     ActivationSpec activationSpec = exchange.getActivationSpec();
                     if (activationSpec != null) {
@@ -320,17 +333,19 @@
                     throwException = activationSpec.isFailIfNoDestinationEndpoint();
                 }
                 if (throwException) {
-                    throw new MessagingException("Could not find route for exchange: " + exchange
-                            + " for service: " + serviceName + " and interface: " + interfaceName);
+                    throw new MessagingException("Could not find route for exchange: " + exchange + " for service: " + serviceName + " and interface: "
+                            + interfaceName);
                 }
             }
-           if (log.isTraceEnabled()){
-               log.trace("routing exchange " + packet + " TO: " + theEndpoint);
-           }
+            if (log.isTraceEnabled()) {
+                log.trace("routing exchange " + packet + " TO: " + theEndpoint);
+            }
+        }
+
+        if (packet.getEndpoint() != null) {
+            flow.send(packet);
         }
-        
-        flow.send(packet);
-        
+
         getSubscriptionManager().dispatchToSubscribers(exchange);
     }
 
@@ -343,6 +358,7 @@
 
     /**
      * Set the default EndpointChooser
+     * 
      * @param defaultInterfaceChooser
      */
     public void setDefaultInterfaceChooser(EndpointChooser defaultInterfaceChooser) {
@@ -358,6 +374,7 @@
 
     /**
      * Set default EndpointChooser
+     * 
      * @param defaultServiceChooser
      */
     public void setDefaultServiceChooser(EndpointChooser defaultServiceChooser) {
@@ -365,8 +382,10 @@
     }
 
     /**
-     * Returns the endpoint chooser for endpoints found by service which will use the chooser on the exchange's
-     * activation spec if available otherwise will use the default
+     * Returns the endpoint chooser for endpoints found by service which will
+     * use the chooser on the exchange's activation spec if available otherwise
+     * will use the default
+     * 
      * @param exchange
      * @return the EndpointChooser
      */
@@ -382,10 +401,11 @@
         return chooser;
     }
 
-
     /**
-     * Returns the endpoint chooser for endpoints found by service which will use the chooser on the exchange's
-     * activation spec if available otherwise will use the default
+     * Returns the endpoint chooser for endpoints found by service which will
+     * use the chooser on the exchange's activation spec if available otherwise
+     * will use the default
+     * 
      * @param exchange
      * @return the EndpointChooser
      */
@@ -402,7 +422,9 @@
     }
 
     /**
-     * Factory method to create an endpoint filter for the given component context and message exchange
+     * Factory method to create an endpoint filter for the given component
+     * context and message exchange
+     * 
      * @param context
      * @param exchange
      * @return the EndpointFilter
@@ -417,7 +439,7 @@
             return new ProducerComponentEndpointFilter(component);
         }
     }
-    
+
     /**
      * Get an array of MBeanOperationInfo
      * 
@@ -428,9 +450,8 @@
         OperationInfoHelper helper = new OperationInfoHelper();
         helper.addOperation(getObjectToManage(), "suspend", "suspend the NMR processing");
         helper.addOperation(getObjectToManage(), "resume", "resume the NMR processing");
-       
+
         return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
     }
 
-    
 }

servicemix/base/src/test/java/org/servicemix/components/jms
JmsBindingUsingEndpointSubscriptionTest.java added at 1.1
diff -N JmsBindingUsingEndpointSubscriptionTest.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ JmsBindingUsingEndpointSubscriptionTest.java	20 Sep 2005 17:38:15 -0000	1.1
@@ -0,0 +1,40 @@
+/** 
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.servicemix.components.jms;
+
+import org.servicemix.TestSupport;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Revision$
+ */
+public class JmsBindingUsingEndpointSubscriptionTest extends TestSupport {
+
+    public void testSendMessagesToJmsThenOutofJmsToReceiver() throws Exception {
+        QName service = new QName("http://servicemix.org/cheese/", "jmsSender");
+
+        assertSendAndReceiveMessages(service);
+    }
+
+    protected AbstractXmlApplicationContext createBeanFactory() {
+        return new ClassPathXmlApplicationContext("org/servicemix/components/jms/example-endpoint-subscription.xml");
+    }
+}
CVSspam 0.2.8



Reply via email to