I setup a test system to better understand how networked brokers interact and messages, etc. are seen via JMX. The receiving broker is receiving the messages sent by the transmit broker but when I view it via JMX it shows that the messages were not dequeued. I've ran a network test before and viewed it as the messages were dequeued so I don't know what the difference is this time. On the transmit side the JMX shows 0 for producers, queue size, dispatch count, enqueue count, etc. Why does the side networking to another broker show nothing for these but the server side broker does? Code below
JMSMonitor.java // used to monitor via JMX public class JMSMonitor { public static void main(final String[] args) { if(args.length != 3) { System.out.println("Invalid argument set:Three arguments required: IP, broker domain name and broker name"); } else { try { final String ip = args[0]; final String domainName = args[1]; final String brokerName = args[2]; final int rmiregport = 1099; final String url = "service:jmx:rmi:///jndi/rmi://" + ip + ":" + rmiregport + "/jmxrmi"; System.out.println("JMXServiceURL address=" + url); final JMXServiceURL address = new JMXServiceURL(url); final JMXConnector connector = JMXConnectorFactory.connect(address, null); connector.connect(); final MBeanServerConnection connection = connector.getMBeanServerConnection(); final ObjectName jmsManagementObjectName = new ObjectName(domainName + ":BrokerName=" + brokerName + ",Type=Broker"); final BrokerViewMBean brokerMBean = MBeanServerInvocationHandler.newProxyInstance(connection, jmsManagementObjectName, BrokerViewMBean.class, true); final Console console = System.console(); String userInput; do { System.out.println("Number of queues=" + brokerMBean.getQueues().length); for(final ObjectName queueName : brokerMBean.getQueues()) { final QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.class, true); if(queueMbean.getName().startsWith("Van") || queueMbean.getName().equals("START")) { System.out.println("Queue name=" + queueMbean.getName() + ", consumer count=" + queueMbean.getConsumerCount() + ", producer count=" + queueMbean.getProducerCount() + ", queue size=" + queueMbean.getQueueSize() + ", dispatch count=" + queueMbean.getDispatchCount() + ", enqueue count=" + queueMbean.getEnqueueCount() + ", dequeue count=" + queueMbean.getDequeueCount()); } } userInput = console.readLine("Enter 'exit' to end or any key to continue\n==>"); } while(userInput.equalsIgnoreCase("exit") == false); } catch(final Exception e) { e.printStackTrace(); } } } } activemq.xml // Transmit broker <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.base}/conf/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="networkTestTransmit" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext connectorPort="1099" jmxDomainName="transmit" /> </managementContext> <networkConnectors> <networkConnector uri="static:(tcp://IP of receiving broker:61616)" duplex="true"/> </networkConnectors> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> <import resource="jetty.xml"/> </beans> MsgSenderTest.java // Posts JMS messages public class MsgSenderTest { private static ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP of receiver computer:61616"); /** * @param args */ public static void main(final String[] args_) { System.out.println("Connecting to ActiveMQ:" + MsgSenderTest.connectionFactory.getBrokerURL()); if(args_.length < 2) { System.out.println("Test number not specified as first argument or number of messages as second argument"); System.exit(0); } Connection connection = null; Session startTopicSession = null; MessageProducer startProducer = null; try { final int numberOfMessages = Integer.parseInt(args_[1]); connection = MsgSenderTest.connectionFactory.createConnection(); connection.start(); startTopicSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final Queue startQueue = startTopicSession.createQueue("VanTestTransmit" + args_[0]); startProducer = startTopicSession.createProducer(startQueue); for(int i = 0; i < numberOfMessages; i++) { System.out.println("Sending message #" + (i + 1)); startProducer.send(startTopicSession.createMessage()); try { Thread.sleep(500); } catch(final Exception e2){} } final Message message = startTopicSession.createMessage(); message.setStringProperty("END", ""); startProducer.send(message); } catch(final JMSException e) { e.printStackTrace(); } finally { if(startProducer != null) { try { System.out.println("Closing Producer"); startProducer.close(); } catch(final JMSException e) { e.printStackTrace(); } } if(startTopicSession != null) { try { System.out.println("Closing Session"); startTopicSession.close(); } catch(final JMSException e) { e.printStackTrace(); } } if(connection != null) { try { System.out.println("Closing Connection"); connection.close(); } catch(final JMSException e) { e.printStackTrace(); } } } } } activemq.xml // ActiveMQ config file for receiving broker <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd" <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.base}/conf/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="networkTestReceive" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext connectorPort="1099" jmxDomainName="receive" /> </managementContext> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> </transportConnectors> </broker> <import resource="jetty.xml"/> </beans> MsgListenerTest.java // Receives messages public class MsgListenerTest implements MessageListener { private static ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); private Connection connection; private Session session; private MessageConsumer consumer; private int msgNumber = 1; public static void main(final String[] args_) { if(args_.length < 1) { System.out.println("Queue id not defined in arg #1"); System.exit(0); } new MsgListenerTest("VanTestTransmit" + args_[0]); } public MsgListenerTest(final String queueName_) { try { this.connection = MsgListenerTest.connectionFactory.createConnection(); this.connection.start(); this.session = this.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); this.consumer = this.session.createConsumer(this.session.createQueue(queueName_)); this.consumer.setMessageListener(this); } catch(final Exception e) { e.printStackTrace(); } } @Override public void onMessage(final Message message_) { try { if(message_.getStringProperty("END") == null) { System.out.println("Received messaage #" + this.msgNumber); this.msgNumber++; } else { if(this.consumer != null) { System.out.println("Closing Consumer"); try { this.consumer.close(); } catch(final JMSException e) { e.printStackTrace(); } } if(this.session != null) { System.out.println("Closing Session"); try { this.session.close(); } catch(final JMSException e) { e.printStackTrace(); } } if(this.connection != null) { System.out.println("Closing Connection"); try { this.connection.close(); } catch(final JMSException e) { e.printStackTrace(); } } } } catch(final JMSException e2) { e2.printStackTrace(); } } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Understanding-Networking-and-JMX-tp3922968p3922968.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.