I setup a test system to better understand how networked brokers interact and
messages, etc. are seen via JMX. The receiving broker is receiving the
messages sent by the transmit broker but when I view it via JMX it shows
that the messages were not dequeued. I've ran a network test before and
viewed it as the messages were dequeued so I don't know what the difference
is this time. On the transmit side the JMX shows 0 for producers, queue
size, dispatch count, enqueue count, etc. Why does the side networking to
another broker show nothing for these but the server side broker does? Code
below

JMSMonitor.java  // used to monitor via JMX
public class JMSMonitor
{
   public static void main(final String[] args)
   {
      if(args.length != 3)
      {
         System.out.println("Invalid argument set:Three arguments required:
IP, broker domain name and broker name");
      }
      else
      {
         try
         {
            final String ip = args[0];
            final String domainName = args[1];
            final String brokerName = args[2];

            final int rmiregport = 1099;

            final String url = "service:jmx:rmi:///jndi/rmi://" + ip + ":" +
rmiregport + "/jmxrmi";
            System.out.println("JMXServiceURL address=" + url);

            final JMXServiceURL address = new JMXServiceURL(url);
            final JMXConnector connector =
JMXConnectorFactory.connect(address,
                                                                      
null);

            connector.connect();

            final MBeanServerConnection connection =
connector.getMBeanServerConnection();

            final ObjectName jmsManagementObjectName = new
ObjectName(domainName + ":BrokerName=" + brokerName + ",Type=Broker");

            final BrokerViewMBean brokerMBean =
MBeanServerInvocationHandler.newProxyInstance(connection, 
jmsManagementObjectName, BrokerViewMBean.class, true);

            final Console console = System.console();
            String userInput;

            do
            {
               System.out.println("Number of queues=" +
brokerMBean.getQueues().length);

               for(final ObjectName queueName : brokerMBean.getQueues())
               {
                  final QueueViewMBean queueMbean =
MBeanServerInvocationHandler.newProxyInstance(connection, queueName,
QueueViewMBean.class, true);

                  if(queueMbean.getName().startsWith("Van") ||
queueMbean.getName().equals("START"))
                  {
                     System.out.println("Queue name=" + queueMbean.getName()
+
                                        ", consumer count=" +
queueMbean.getConsumerCount() +
                                        ", producer count=" +
queueMbean.getProducerCount() +
                                        ", queue size=" +
queueMbean.getQueueSize() +
                                        ", dispatch count=" +
queueMbean.getDispatchCount() +
                                        ", enqueue count=" +
queueMbean.getEnqueueCount() +
                                        ", dequeue count=" +
queueMbean.getDequeueCount());
                  }
               }

               userInput = console.readLine("Enter 'exit' to end or any key
to continue\n==>");
            }
            while(userInput.equalsIgnoreCase("exit") == false);
         }
         catch(final Exception e)
         {
            e.printStackTrace();
         }
      }
   }
}

activemq.xml  // Transmit broker
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core";
            brokerName="networkTestTransmit"
            dataDirectory="${activemq.base}/data"
            destroyApplicationContextOnStop="true">
 
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 
 
        <managementContext>
            <managementContext connectorPort="1099"
                               jmxDomainName="transmit" />
        </managementContext>

        <networkConnectors>
            <networkConnector uri="static:(tcp://IP of receiving
broker:61616)"
                              duplex="true"/>
        </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>
        
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>
    </broker>

    <import resource="jetty.xml"/>
</beans>

MsgSenderTest.java  // Posts JMS messages
public class MsgSenderTest
{
   private static ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://IP of receiver computer:61616");

   /**
    * @param args
    */
   public static void main(final String[] args_)
   {
      System.out.println("Connecting to ActiveMQ:" +
MsgSenderTest.connectionFactory.getBrokerURL());

      if(args_.length < 2)
      {
         System.out.println("Test number not specified as first argument or
number of messages as second argument");
         System.exit(0);
      }

      Connection connection = null;
      Session startTopicSession = null;
      MessageProducer startProducer = null;

      try
      {
         final int numberOfMessages = Integer.parseInt(args_[1]);

         connection = MsgSenderTest.connectionFactory.createConnection();
         connection.start();

         startTopicSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

         final Queue startQueue =
startTopicSession.createQueue("VanTestTransmit" + args_[0]);

         startProducer = startTopicSession.createProducer(startQueue);

         for(int i = 0; i < numberOfMessages; i++)
         {
            System.out.println("Sending message #" + (i + 1));
            startProducer.send(startTopicSession.createMessage());

            try
            {
               Thread.sleep(500);
            }
            catch(final Exception e2){}
         }

         final Message message = startTopicSession.createMessage();

         message.setStringProperty("END", "");
         startProducer.send(message);
      }
      catch(final JMSException e)            {              
e.printStackTrace();            }
      finally
      {
         if(startProducer != null)
         {
            try
            {
               System.out.println("Closing Producer");
               startProducer.close();
            }
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }

         if(startTopicSession != null)
         {
            try
            {
               System.out.println("Closing Session");
               startTopicSession.close();
            }
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }

         if(connection != null)
         {
            try
            {
               System.out.println("Closing Connection");
               connection.close();
            }
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }
      }
   }
}

activemq.xml  // ActiveMQ config file for receiving broker
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core";
            brokerName="networkTestReceive"
            dataDirectory="${activemq.base}/data"
            destroyApplicationContextOnStop="true">
 
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 

        <managementContext>
            <managementContext connectorPort="1099"
                               jmxDomainName="receive" />
        </managementContext>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>
        
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>
    </broker>
    <import resource="jetty.xml"/>
</beans>

MsgListenerTest.java  // Receives messages
public class MsgListenerTest implements MessageListener
{
   private static ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");

   private Connection connection;
   private Session session;
   private MessageConsumer consumer;
   private int msgNumber = 1;

   public static void main(final String[] args_)
   {
      if(args_.length < 1)
      {
         System.out.println("Queue id not defined in arg #1");
         System.exit(0);
      }

      new MsgListenerTest("VanTestTransmit" + args_[0]);
   }


   public MsgListenerTest(final String queueName_)
   {
      try
      {
         this.connection =
MsgListenerTest.connectionFactory.createConnection();
         this.connection.start();

         this.session = this.connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);

         this.consumer =
this.session.createConsumer(this.session.createQueue(queueName_));

         this.consumer.setMessageListener(this);
      }
      catch(final Exception e)
      {
         e.printStackTrace();
      }
   }

   @Override
   public void onMessage(final Message message_)
   {
      try
      {
         if(message_.getStringProperty("END") == null)
         {
            System.out.println("Received messaage #" + this.msgNumber);

            this.msgNumber++;
         }
         else
         {
            if(this.consumer != null)
            {
               System.out.println("Closing Consumer");
               try
               {
                  this.consumer.close();
               }
               catch(final JMSException e)               {                 
e.printStackTrace();               }
            }

            if(this.session != null)
            {
               System.out.println("Closing Session");
               try
               {
                  this.session.close();
               }
               catch(final JMSException e)               {                 
e.printStackTrace();               }
            }

            if(this.connection != null)
            {
               System.out.println("Closing Connection");
               try
               {
                  this.connection.close();
               }
               catch(final JMSException e)               {                 
e.printStackTrace();               }
            }
         }
      }
      catch(final JMSException e2)      {         e2.printStackTrace();     
}
   }
}

--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Understanding-Networking-and-JMX-tp3922968p3922968.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to