We have an application which is using ServiceMix 3.0-incubating with Spring
2.0 and ActiveMQ 4.1.0-incubator.  We are also building with Jencks 1.3. (It
is running under ServiceMix, not a J2EE app.)  Each ServiceMix component has
as its destinationService a broker component who determines (by
content-based routing) which service to send a message to next in a flow
which will ultimately use several components.

Here is a snippet illustrating how the message is sent from the broker
component to the next dynamic destination:

                    InOnly messageExchange = getDeliveryChannel()
                            .createExchangeFactory().createInOnlyExchange();
                    NormalizedMessage out = messageExchange.createMessage();

                    out.setContent(new StringSource(req.toXML()));
                    out.setProperty(req.getClass().getName(), req);
                    messageExchange.setInMessage(out);
                    messageExchange.setService(new
QName("http://my.com/myApp";, "myService",
                            "myPrefix"));

                    getDeliveryChannel().send(messageExchange);

At this level, It would appear to resolve only to a service, not a
particular endpoint. 

In deployment, I am attempting to run the app on two machines on the same
subnet with the same configuration (all services are available on each
machine) and start the process on one of the installations, and desire to
see messages being forwarded to other installations for processing, for
load-balancing.  

The machines being used in testing are different OS's and VM's, but all are
java 1.5.0.x.  Because a simple setup of the container using a unique name
and flowName="jms" running on different VM's caused the following exception,
I started investigating setting up an internal ActiveMQ broker so that it
could be configured to support sending messages in another way than
serialized.

2006-12-21 12:59:46,419 [ActiveMQ Session Task] ERROR -
JMSFlow$3.onMessage(291) | Error processing incoming broadcast message
javax.jms.JMSException: Failed to build body from bytes. Reason:
java.io.InvalidClassException: javax.xml.namespace.QName; local class
incompatible: stream classdesc serialVersionUID = -9120448754896609940,
local class serialVersionUID = 4418622981026545151

I finally ended up with the following setup (some of which may need to be
culled at this point) which has the homogenous nodes connecting and
negotiating a transport format, but all of the messages are still being
routed to components on the initiating node only.

Here is the current servicemix setup I have (gleaned from many sources over
the last couple of weeks and modified to use Spring):

=================================================

  <!-- the JBI container ("name" must be unique if running in a cluster) -->
  <sm:container 
                name="${HOSTNAME}"
        id="jbi"
        createJmxConnector="false"
        MBeanServer="#mbeanServer"
        dumpStats="true"
        statsInterval="10"
        persistent="true" depends-on="localbroker">

  <sm:flow>
    <bean class="org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow">
      <property name="connectionFactory" ref="localFactory" />
    </bean>
  </sm:flow>   
     
    <sm:activationSpecs>

          <sm:activationSpec componentName="brokerHttpReceiver"         
                                                   
service="myPrefix:brokerHttpBinding"
                                                   endpoint="brokerHttpReceiver"
                                                   
destinationService="myPrefix:broker">
                  <sm:component>
                    <bean xmlns="http://xbean.org/schemas/spring/1.0";
                          
class="org.apache.servicemix.components.http.HttpConnector">
                        <property name="host" value="localhost"/>
                        <property name="port" value="8913"/>
                        <property name="defaultInOut" value="false"/>
                    </bean>
                  </sm:component>
          </sm:activationSpec>

      <sm:activationSpec componentName="IPTransactionBroker"
service="myPrefix:broker">
        <sm:component>
          <bean class="com.my.components.broker.IPTransactionBroker">
            <property name="workManager" ref="workManager" />
            <property name="definitionsManager" ref="IPTXDefinitionsManager"
/>
            <property name="transactionManager" ref="ipTransactionManager"
/>
          </bean>
        </sm:component>
      </sm:activationSpec>
...

  <!-- Most other components with destinationService="myPrefix:broker" are
dynamically loaded -->

    </sm:activationSpecs>
  </sm:container>      

  <!-- the work manager (thread pool) for the entire JBI container -->
  <bean id="workManager" class="org.jencks.factory.WorkManagerFactoryBean">
    <property name="threadPoolSize" value="30" />
  </bean>
  
   <!-- additions for clustering -->
   <bean id="transactionManagerImpl"
class="org.jencks.factory.TransactionManagerFactoryBean">
      <property name="defaultTransactionTimeoutSeconds" value="600"/>

      <property name="transactionLog">
         <bean
class="org.apache.geronimo.transaction.log.UnrecoverableLog"/>
      </property> 
   </bean>

   <bean id="transactionContextManager"
class="org.jencks.factory.TransactionContextManagerFactoryBean">
      <property name="transactionManager" ref="transactionManagerImpl"/>
   </bean>

   <bean id="connectionTracker"
     
class="org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTrackingCoordinator"/>
  
   <bean id="connectionManager"
class="org.jencks.factory.ConnectionManagerFactoryBean">
      <property name="transactionContextManager"
ref="transactionContextManager"/>
      <property name="connectionTracker" ref="connectionTracker"/>
   </bean>

   <bean id="bootstrapContext"
class="org.jencks.factory.BootstrapContextFactoryBean">
      <property name="workManager" ref="workManager"/>
   </bean>
   
   <bean id="jmsResourceAdapter"
class="org.apache.activemq.ra.ActiveMQResourceAdapter" singleton="true">
      <property name="connectionFactory" ref="localFactory"/>
      <property name="serverUrl" value="peer://localBroker-${HOSTNAME}"/>
<!-- see
http://incubator.apache.org/activemq/resource-adapter-properties.html -->
   </bean> 
     
  <bean id="jencks" class="org.jencks.JCAContainer">
     <property name="bootstrapContext" ref="bootstrapContext"/>
     <property name="resourceAdapter" ref="jmsResourceAdapter"/>
   </bean>

<!-- JMX server and connector -->
  <bean id="mbeanServer"
class="org.springframework.jmx.support.MBeanServerFactoryBean"/>

  <bean id="registry" class="mx4j.tools.naming.NamingService"
init-method="start">
    <!-- tell each RMI registry to run on it's own port! -->
    <property name="port" value="1092"/>
  </bean>

  <bean id="serverConnector"
class="org.springframework.jmx.support.ConnectorServerFactoryBean"
depends-on="registry">
    <property name="objectName" value="connector:name=rmi"/>
    <property name="serviceUrl"
value="service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi"/>
    <property name="threaded" value="true"/>
    <property name="daemon" value="true"/>
  </bean>

  <bean id="managementContext"
class="org.apache.activemq.broker.jmx.ManagementContext">
    <property name="MBeanServer" ref="mbeanServer"/>
  </bean>       
        
  <bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
    init-method="start">
      <property name="brokerName" value="localBroker-${HOSTNAME}"/>
      <property name="persistent" value="true"/>
      <property name="managementContext" ref="managementContext"/>
      <property name="useJmx" value="true"/>

 <!-- http://en.wikipedia.org/wiki/Time_to_live ; also 
     
http://incubator.apache.org/activemq/activemq-3-transport-configurations.html  
      (timeToLive (int) defaults to 1 so we don't send multicast messages
beyond our own local subnet.-->

<!--using distributed queues, having equivalent weighting to queue receivers
across the network, 
    but only when the receivers are active -
http://www.activemq.org/site/networks-of-brokers.html -->
    <property name="networkConnectors">
        <list>
                <bean 
class="org.apache.activemq.network.DiscoveryNetworkConnector">
                        <property name="uri" value="multicast://ipc"/>
                        <!-- set TTL to 64 to attempt to get working also over 
VPN (unsuccessful)
-->
                        <property name="networkTTL" value="64"/>
                    <property name="name" value="queues_only"/>
                        <property name="dynamicOnly" value="true"/>
                <property name="conduitSubscriptions" value="false"/>
                <property name="decreaseNetworkConsumerPriority"
value="false"/>
                                <property name="excludedDestinations">
                                        <list>
                                                <bean 
class="org.apache.activemq.command.ActiveMQTopic">
                                                        <property 
name="physicalName" value=">"/>
                                                </bean>
                                        </list>
                        </property>
            </bean>
        </list>
    </property>

      <property name="transportConnectors">
        <list>
                <bean class="org.apache.activemq.broker.TransportConnector">
                    <property name="name" value="openwire"/>
                        <property name="uri" value="tcp://localhost:61616"/>
                        <property name="discoveryUri"
value="multicast://ipc?timeToLive=64"/>
                </bean>
                <bean class="org.apache.activemq.broker.TransportConnector">
                    <property name="name" value="stomp"/>
                        <property name="uri" value="stomp://localhost:61613"/>
                </bean>  
                <!--
                <bean class="org.apache.activemq.broker.TransportConnector">
                    <property name="name" value="ssl"/>
                        <property name="uri" value="ssl://localhost:61617"/>
                </bean>  
                -->     
        </list>
      </property>

        <!-- to facilitate testing... -->
        <!-- mostly from ActiveMQ example, which does send from producer on one
machine to consumer on another: -->
      <property name="destinationPolicy">
        <bean class="org.apache.activemq.broker.region.policy.PolicyMap">
                <property name="defaultEntry">
                        <bean 
class="org.apache.activemq.broker.region.policy.PolicyEntry">
                                <property name="dispatchPolicy">                
                
                                        <bean
class="org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy"/>
                                </property>     
                                <property name="destination">
                                        <bean 
class="org.apache.activemq.command.ActiveMQQueue">
                                                <constructor-arg 
value="org.apache.servicemix.jms.${HOSTNAME}"/>
                                        </bean>
                                </property>
                                <property name="subscriptionRecoveryPolicy">
                                        <bean
class="org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy"/>
                                </property>
                        </bean>
                </property>
        </bean>
      </property>
      
      <!-- to facilitate testing... -->
      <bean factory-method="getBroker" factory-bean="ipc_jbi">
                <property name="defaultServiceChooser">
                <bean 
class="org.apache.servicemix.jbi.resolver.RandomChoicePolicy" />
                </property>
          </bean> 
      <!-- -->
  </bean>

  <!-- JMS ConnectionFactory to use local broker -->
  <!-- use depends-on to prevent creating JMS connections before broker
starts; 
           see http://www.activemq.org/site/vm-transport-reference.html -->
  <bean id="localFactory" depends-on="localbroker"
    class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="peer://localBroker-${HOSTNAME}" />
    <property name="statsEnabled" value="true"/> 

    <!-- to facilitate testing... -->
    <property name="prefetchPolicy">
        <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="queuePrefetch" value="1"/>
                <property name="inputStreamPrefetch" value="1"/>
                <property name="queueBrowserPrefetch" value="1"/>
        </bean>
    </property>

  </bean> 
  
</beans>
=================================================

Some questions, then:

1)  Am I right to expect the transactions (messages) to be routed to other
installations of this configuration on the same subnet?

2)  Should I be able to see MBeans for all the queues for all started
installations when using jconsole to view
"service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"
(or "service:jmx:rmi:///jndi/rmi://localhost:1092/jmxrmi", setup this way to
prevent an error "ObjID already in use"), or just the local queues and then
connections for the other installations?  (Does JMX even matter for
ServiceMix routing to clustered nodes?)

3)  I found the following possibly very relevant posting on the "ServiceMix
- Dev" list:
http://www.nabble.com/-jira--Updated%3A-%28SM-344%29-Improve-clustering-by-allowing-implicit-endpoint-selection-to-be-done-by-the-flow-tf1358361s12049.html#a3638632
- was this ever addressed? (The JIRA posting's status is still "Open".)

4)  If the improvement in (3) is not done, what else should I be doing to
cause the behavior that I am seeking?

Thanks for any light that anyone can shed on this problem.

- Ellen



-- 
View this message in context: 
http://www.nabble.com/Endpoint-Determination-Made-By-Flow-in-a-Clustered-Environment--tf2868246s12049.html#a8016723
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to