Hi, all,

Has anyone encountered a problem with
VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
messages when the subscriber re-connects?

We have the following test case fails with
VMPendingDurableSubscriberMessageStoragePolicy set as default destination
policy:                 
1) publish persistent messages:
              void publish(String brokerUrl) {
                  try {
                        ConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
                        connection = factory.createTopicConnection();
                        connection.setClientID(CLIENT_ID);

                        /**
                         * Create a JMS session
                         */
                        TopicSession session = 
connection.createTopicSession(false,
                                        Session.AUTO_ACKNOWLEDGE);

                        /**
                         * Fully Qualified Topic Name
                         */
                        String fullyQualifiedTopicName = this.topicName;

                        /**
                         * Get the topic instance from the topic name.
                         */
                        Topic topic = 
session.createTopic(fullyQualifiedTopicName);

                        /**
                         * Create a topic subscriber to the topic.
                         */
                        TopicSubscriber ts = 
session.createDurableSubscriber(topic,
                                        DURABLE_SUB_NAME);

                        /**
                         * Create a publisher to publish messages to the topic
                         */
                        TopicPublisher publisher = 
session.createPublisher(topic);

                        int deliveryMode = DeliveryMode.PERSISTENT;

                        /**
                         * Send and receive 10 messages to the topic.
                         */
                        for (int i = 0; i < 10; i++) {
                                TextMessage tm = 
session.createTextMessage("msg-" + i);

                                LOG.info("publishing msg to broker, msg=" + 
tm.getText());

                                publisher.publish(topic, tm, deliveryMode, 4, 
0);
                        }

                        Thread.sleep(1000);

                        publisher.close();
                        session.close();

                } catch (Exception e) {
                        LOG.error(e.getMessage(), e);
                } finally {
                        try {

                                connection.close();
                        } catch (Exception e) {
                                ;
                        }
                }
           }
2) after the messages are published, subscriber connects to receive the
previously published messages:
    void consume(String brokerUrl) {
                try {
                        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
                        /**
                         * Create a topic connection from the obtained 
connection factory.
                         */
                        connection = factory.createTopicConnection();

                        connection.setClientID(CLIENT_ID);

                        /**
                         * Create a JMS session
                         */
                        TopicSession session = 
connection.createTopicSession(false,
                                        Session.AUTO_ACKNOWLEDGE);

                        /**
                         * Fully Qualified Topic Name
                         */
                        String fullyQualifiedTopicName = this.topicLocalName;

                        /**
                         * Get the topic instance from the topic name.
                         */
                        Topic topic = 
session.createTopic(fullyQualifiedTopicName);

                        /**
                         * Create a topic subscriber to the topic.
                         */
                        TopicSubscriber ts = 
session.createDurableSubscriber(topic,
                                        DURABLE_SUB_NAME);

                        /**
                         * Start the connection - message will be delivered to 
the
                         * subscribers.
                         */
                        connection.start();

                        /**
                         * Send and receive 10 messages to the topic.
                         */
                        for (int i = 0; i < 10; i++) {

                                TextMessage tm2 = (TextMessage) ts.receive();

                                if (tm2 == null) {
                                        throw new RuntimeException("message 
validation failed.");
                                }

                                LOG.info("receive msg from broker, msg 
received="
                                                + tm2.getText());

                        }

                        Thread.sleep(1000);

                } catch (Exception e) {
                        LOG.error(e.getMessage(), e);
                } finally {
                        try {
                                connection.close();
                        } catch (Exception e) {
                                ;
                        }
                }
    }

We noticed that when the default destination policy is set to
VMPendingDurableSubscriberMessageStoragePolicy, the above consume() function
fails and stuck at the blocking call: 
               TextMessage tm2 = (TextMessage) ts.receive();

Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0

Thanks a lot!

-Yi



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to