I've created a simple setup to test persistent messages on durable topics but I'm experiencing some erratic behavior.
Producer and consumer are copied straight from the examples and pasted below. Two standalone 5.1.0 brokers on two hosts with default config, using dynamic discovery for producer and consumer. The publisher sends 100 messages, after 10 messages I CTRL+C'ed the consumer. I then restarted the consumer (10 seconds later) at message 20 but never received messages 11-20. This is only the case when you first start the broker. When I leave the brokers running and repeat the test, everything works fine and I get missed messages. I also noticed that the consumer connected to the second broker when I started it the second time. #################################################### # Consumer Output #################################################### C:\eclipse\workspace\jms-clients\bin>java -cp c:\dev\apache-activemq-5.1.0\activemq-all-5.1.0.jar;. AMQTopicSubscriber Jun 19, 2008 12:47:48 PM org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd INFO: Adding new broker connection URL: tcp://mercury.xyzcorp.com:61616 Jun 19, 2008 12:47:48 PM org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd INFO: Adding new broker connection URL: tcp://saturn.xyzcorp.com:61616 Jun 19, 2008 12:47:48 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect INFO: Successfully connected to tcp://saturn.xyzcorp.com:61616 Connected to topic. Received: Message: 0 sent at: Thu Jun 19 12:47:53 EDT 2008 Received: Message: 1 sent at: Thu Jun 19 12:47:54 EDT 2008 Received: Message: 2 sent at: Thu Jun 19 12:47:55 EDT 2008 Received: Message: 3 sent at: Thu Jun 19 12:47:56 EDT 2008 Received: Message: 4 sent at: Thu Jun 19 12:47:57 EDT 2008 Received: Message: 5 sent at: Thu Jun 19 12:47:58 EDT 2008 Received: Message: 6 sent at: Thu Jun 19 12:47:59 EDT 2008 Received: Message: 7 sent at: Thu Jun 19 12:48:00 EDT 2008 Received: Message: 8 sent at: Thu Jun 19 12:48:01 EDT 2008 Received: Message: 9 sent at: Thu Jun 19 12:48:02 EDT 2008 Received: Message: 10 sent at: Thu Jun 19 12:48:03 EDT 2008 C:\eclipse\workspace\jms-clients\bin>java -cp c:\dev\apache-activemq-5.1.0\activemq-all-5.1.0.jar;. AMQTopicSubscriber Jun 19, 2008 12:48:14 PM org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd INFO: Adding new broker connection URL: tcp://saturn.xyzcorp.com:61616 Jun 19, 2008 12:48:14 PM org.apache.activemq.transport.discovery.DiscoveryTransport onServiceAdd INFO: Adding new broker connection URL: tcp://mercury.xyzcorp.com:61616 Jun 19, 2008 12:48:14 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect INFO: Successfully connected to tcp://mercury.xyzcorp.com:61616 Connected to topic. Received: Message: 21 sent at: Thu Jun 19 12:48:14 EDT 2008 Received: Message: 22 sent at: Thu Jun 19 12:48:15 EDT 2008 Received: Message: 23 sent at: Thu Jun 19 12:48:16 EDT 2008 Received: Message: 24 sent at: Thu Jun 19 12:48:17 EDT 2008 Received: Message: 25 sent at: Thu Jun 19 12:48:18 EDT 2008 Received: Message: 26 sent at: Thu Jun 19 12:48:19 EDT 2008 Received: Message: 27 sent at: Thu Jun 19 12:48:20 EDT 2008 Received: Message: 28 sent at: Thu Jun 19 12:48:21 EDT 2008 Received: Message: 29 sent at: Thu Jun 19 12:48:22 EDT 2008 Received: Message: 30 sent at: Thu Jun 19 12:48:23 EDT 2008 Received: Message: 31 sent at: Thu Jun 19 12:48:24 EDT 2008 (all the way to Message 99) #################################################### # Broker 1 Startup #################################################### [EMAIL PROTECTED] bin]$ ./activemq ACTIVEMQ_HOME: /usr/local/apache-activemq-5.1.0 ACTIVEMQ_BASE: /usr/local/apache-activemq-5.1.0 Loading message broker from: xbean:activemq.xml INFO BrokerService - Using Persistence Adapter: AMQPersistenceAdapter(/usr/local/apache-activemq-5.1.0/data) INFO BrokerService - ActiveMQ 5.1.0 JMS Message Broker (localhost) is starting INFO BrokerService - For help or more information please see: http://activemq.apache.org/ INFO AMQPersistenceAdapter - AMQStore starting using directory: /usr/local/apache-activemq-5.1.0/data INFO KahaStore - Kaha Store using data directory /usr/local/apache-activemq-5.1.0/data/kr-store/state INFO AMQPersistenceAdapter - Active data files: [] INFO KahaStore - Kaha Store using data directory /usr/local/apache-activemq-5.1.0/data/kr-store/data INFO TransportServerThreadSupport - Listening for connections at: tcp://saturn.xyzcorp.com:61616 INFO TransportConnector - Connector openwire Started INFO TransportServerThreadSupport - Listening for connections at: ssl://saturn.xyzcorp.com:61617 INFO TransportConnector - Connector ssl Started INFO TransportServerThreadSupport - Listening for connections at: stomp://saturn.xyzcorp.com:61613 INFO TransportConnector - Connector stomp Started INFO TransportServerThreadSupport - Listening for connections at: xmpp://saturn.xyzcorp.com:61222 INFO TransportConnector - Connector xmpp Started INFO NetworkConnector - Network Connector default-nc Started INFO BrokerService - ActiveMQ JMS Message Broker (localhost, ID:saturn.xyzcorp.com-37887-1213893891471-0:0) started INFO log - Logging to org.slf4j.impl.JCLLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog INFO log - jetty-6.1.9 INFO DiscoveryNetworkConnector - Establishing network connection between from vm://localhost to tcp://mercury.xyzcorp.com:61616 INFO TransportConnector - Connector vm://localhost Started INFO DemandForwardingBridge - Network connection between vm://localhost#0 and tcp://mercury.xyzcorp.com/10.10.1.53:61616(localhost) has been established. INFO WebConsoleStarter - ActiveMQ WebConsole initialized. INFO /admin - Initializing Spring FrameworkServlet 'dispatcher' INFO log - ActiveMQ Console at http://0.0.0.0:8161/admin INFO log - ActiveMQ Web Demos at http://0.0.0.0:8161/demo INFO log - RESTful file access application at http://0.0.0.0:8161/fileserver INFO log - Started [EMAIL PROTECTED]:8161 INFO FailoverTransport - Successfully connected to tcp://localhost:61616 #################################################### # Broker 2 Startup #################################################### [EMAIL PROTECTED] bin]$ ./activemq ACTIVEMQ_HOME: /usr/local/apache-activemq-5.1.0 ACTIVEMQ_BASE: /usr/local/apache-activemq-5.1.0 Loading message broker from: xbean:activemq.xml INFO BrokerService - Using Persistence Adapter: AMQPersistenceAdapter(/usr/local/apache-activemq-5.1.0/data) INFO BrokerService - ActiveMQ 5.1.0 JMS Message Broker (localhost) is starting INFO BrokerService - For help or more information please see: http://activemq.apache.org/ INFO AMQPersistenceAdapter - AMQStore starting using directory: /usr/local/apache-activemq-5.1.0/data INFO KahaStore - Kaha Store using data directory /usr/local/apache-activemq-5.1.0/data/kr-store/state INFO AMQPersistenceAdapter - Active data files: [] INFO KahaStore - Kaha Store using data directory /usr/local/apache-activemq-5.1.0/data/kr-store/data INFO TransportServerThreadSupport - Listening for connections at: tcp://mercury.xyzcorp.com:61616 INFO TransportConnector - Connector openwire Started INFO TransportServerThreadSupport - Listening for connections at: ssl://mercury.xyzcorp.com:61617 INFO TransportConnector - Connector ssl Started INFO TransportServerThreadSupport - Listening for connections at: stomp://mercury.xyzcorp.com:61613 INFO TransportConnector - Connector stomp Started INFO TransportServerThreadSupport - Listening for connections at: xmpp://mercury.xyzcorp.com:61222 INFO TransportConnector - Connector xmpp Started INFO NetworkConnector - Network Connector default-nc Started INFO BrokerService - ActiveMQ JMS Message Broker (localhost, ID:mercury.xyzcorp.com-47412-1213893913409-0:0) started INFO log - Logging to org.slf4j.impl.JCLLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog INFO log - jetty-6.1.9 INFO DiscoveryNetworkConnector - Establishing network connection between from vm://localhost to tcp://saturn.xyzcorp.com:61616 INFO TransportConnector - Connector vm://localhost Started INFO DemandForwardingBridge - Network connection between vm://localhost#0 and tcp://saturn.xyzcorp.com/10.10.1.61:61616(localhost) has been established. INFO WebConsoleStarter - ActiveMQ WebConsole initialized. INFO /admin - Initializing Spring FrameworkServlet 'dispatcher' INFO log - ActiveMQ Console at http://0.0.0.0:8161/admin INFO log - ActiveMQ Web Demos at http://0.0.0.0:8161/demo INFO log - RESTful file access application at http://0.0.0.0:8161/fileserver INFO log - Started [EMAIL PROTECTED]:8161 INFO FailoverTransport - Successfully connected to tcp://localhost:61616 #################################################### # Publisher #################################################### import java.util.Date; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author kaveh * * Creates a publisher that sends persistent messages to a topic * */ public class AMQTopicPublisher { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "discovery://(multicast://default)"; private String topic = "TEST.FOO"; private boolean transacted = false; private long sleepTime = 1000; private Connection connection; private Destination destination; /** * @param args */ public static void main(String[] args) { AMQTopicPublisher topicPublisher = new AMQTopicPublisher(); topicPublisher.run(); } public void run() { try { // Create the connection. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); connection.start(); // Create the session Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(topic); // Create the persistent producer MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // Send messages for (int i = 0; i < 100; i++) { TextMessage message = session.createTextMessage("Message: " + i + " sent at: " + new Date()); producer.send(message); System.out.println("Message " + i + " sent."); Thread.sleep(sleepTime); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Finished."); System.exit(1); } } } #################################################### # Subscriber #################################################### import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author kaveh * * Creates a durable topic subscriber * */ public class AMQTopicSubscriber implements MessageListener, ExceptionListener { private Session session; private Destination destination; private MessageConsumer consumer; private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "discovery://(multicast://default)"; private String topic = "TEST.FOO"; private boolean transacted = false; private String clientId = "Client1"; private int ackMode = Session.AUTO_ACKNOWLEDGE; private String consumerName = "Consumer1"; private boolean verbose = true; /** * @param args */ public static void main(String[] args) { AMQTopicSubscriber topicSubscriber = new AMQTopicSubscriber(); topicSubscriber.run(); } public void run() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); Connection connection; try { // create and start connection connection = connectionFactory.createConnection(); connection.setClientID(clientId); connection.setExceptionListener(this); connection.start(); // create session and consumer session = connection.createSession(transacted, ackMode); destination = session.createTopic(topic); consumer = session.createDurableSubscriber((Topic)destination, consumerName); consumer.setMessageListener(this); System.out.println("Connected to topic."); while (true) { // Wait for messages } } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage)message; if (verbose) { String msg = txtMsg.getText(); if (msg.length() > 50) { msg = msg.substring(0, 50) + "..."; } System.out.println("Received: " + msg); } } else { if (verbose) { System.out.println("Received: " + message); } } if (transacted) { session.commit(); } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) { message.acknowledge(); } } catch (JMSException e) { System.out.println("Caught: " + e); e.printStackTrace(); } } @Override public void onException(JMSException exception) { System.out.println("JMS Exception occured."); exception.printStackTrace(); } }
