Hi, all,
Has anyone encountered a problem with
VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
messages when the subscriber re-connects?
We have the following test case fails with
VMPendingDurableSubscriberMessageStoragePolicy set as default destination
policy:
1) publish persistent messages:
void publish(String brokerUrl) {
try {
ConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
connection = factory.createTopicConnection();
connection.setClientID(CLIENT_ID);
/**
* Create a JMS session
*/
TopicSession session =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
/**
* Fully Qualified Topic Name
*/
String fullyQualifiedTopicName = this.topicName;
/**
* Get the topic instance from the topic name.
*/
Topic topic =
session.createTopic(fullyQualifiedTopicName);
/**
* Create a topic subscriber to the topic.
*/
TopicSubscriber ts =
session.createDurableSubscriber(topic,
DURABLE_SUB_NAME);
/**
* Create a publisher to publish messages to the topic
*/
TopicPublisher publisher =
session.createPublisher(topic);
int deliveryMode = DeliveryMode.PERSISTENT;
/**
* Send and receive 10 messages to the topic.
*/
for (int i = 0; i < 10; i++) {
TextMessage tm =
session.createTextMessage("msg-" + i);
LOG.info("publishing msg to broker, msg=" +
tm.getText());
publisher.publish(topic, tm, deliveryMode, 4,
0);
}
Thread.sleep(1000);
publisher.close();
session.close();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
;
}
}
}
2) after the messages are published, subscriber connects to receive the
previously published messages:
void consume(String brokerUrl) {
try {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
/**
* Create a topic connection from the obtained
connection factory.
*/
connection = factory.createTopicConnection();
connection.setClientID(CLIENT_ID);
/**
* Create a JMS session
*/
TopicSession session =
connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
/**
* Fully Qualified Topic Name
*/
String fullyQualifiedTopicName = this.topicLocalName;
/**
* Get the topic instance from the topic name.
*/
Topic topic =
session.createTopic(fullyQualifiedTopicName);
/**
* Create a topic subscriber to the topic.
*/
TopicSubscriber ts =
session.createDurableSubscriber(topic,
DURABLE_SUB_NAME);
/**
* Start the connection - message will be delivered to
the
* subscribers.
*/
connection.start();
/**
* Send and receive 10 messages to the topic.
*/
for (int i = 0; i < 10; i++) {
TextMessage tm2 = (TextMessage) ts.receive();
if (tm2 == null) {
throw new RuntimeException("message
validation failed.");
}
LOG.info("receive msg from broker, msg
received="
+ tm2.getText());
}
Thread.sleep(1000);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
try {
connection.close();
} catch (Exception e) {
;
}
}
}
We noticed that when the default destination policy is set to
VMPendingDurableSubscriberMessageStoragePolicy, the above consume() function
fails and stuck at the blocking call:
TextMessage tm2 = (TextMessage) ts.receive();
Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0
Thanks a lot!
-Yi
--
View this message in context:
http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.