Tim fixed some issues around the cursors for durable subs.. can you try your tests on a recent snapshot?
On Wed, Jun 5, 2013 at 11:25 AM, Yi Pan <[email protected]> wrote: > Hi, all, > > Has anyone encountered a problem with > VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent > messages when the subscriber re-connects? > > We have the following test case fails with > VMPendingDurableSubscriberMessageStoragePolicy set as default destination > policy: > 1) publish persistent messages: > void publish(String brokerUrl) { > try { > ConnectionFactory factory = new > ActiveMQConnectionFactory(brokerUrl); > connection = factory.createTopicConnection(); > connection.setClientID(CLIENT_ID); > > /** > * Create a JMS session > */ > TopicSession session = > connection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > > /** > * Fully Qualified Topic Name > */ > String fullyQualifiedTopicName = this.topicName; > > /** > * Get the topic instance from the topic name. > */ > Topic topic = > session.createTopic(fullyQualifiedTopicName); > > /** > * Create a topic subscriber to the topic. > */ > TopicSubscriber ts = > session.createDurableSubscriber(topic, > DURABLE_SUB_NAME); > > /** > * Create a publisher to publish messages to the > topic > */ > TopicPublisher publisher = > session.createPublisher(topic); > > int deliveryMode = DeliveryMode.PERSISTENT; > > /** > * Send and receive 10 messages to the topic. > */ > for (int i = 0; i < 10; i++) { > TextMessage tm = > session.createTextMessage("msg-" + i); > > LOG.info("publishing msg to broker, msg=" > + tm.getText()); > > publisher.publish(topic, tm, deliveryMode, > 4, 0); > } > > Thread.sleep(1000); > > publisher.close(); > session.close(); > > } catch (Exception e) { > LOG.error(e.getMessage(), e); > } finally { > try { > > connection.close(); > } catch (Exception e) { > ; > } > } > } > 2) after the messages are published, subscriber connects to receive the > previously published messages: > void consume(String brokerUrl) { > try { > ActiveMQConnectionFactory factory = new > ActiveMQConnectionFactory(brokerUrl); > /** > * Create a topic connection from the obtained > connection factory. > */ > connection = factory.createTopicConnection(); > > connection.setClientID(CLIENT_ID); > > /** > * Create a JMS session > */ > TopicSession session = > connection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > > /** > * Fully Qualified Topic Name > */ > String fullyQualifiedTopicName = > this.topicLocalName; > > /** > * Get the topic instance from the topic name. > */ > Topic topic = > session.createTopic(fullyQualifiedTopicName); > > /** > * Create a topic subscriber to the topic. > */ > TopicSubscriber ts = > session.createDurableSubscriber(topic, > DURABLE_SUB_NAME); > > /** > * Start the connection - message will be > delivered to the > * subscribers. > */ > connection.start(); > > /** > * Send and receive 10 messages to the topic. > */ > for (int i = 0; i < 10; i++) { > > TextMessage tm2 = (TextMessage) > ts.receive(); > > if (tm2 == null) { > throw new > RuntimeException("message validation failed."); > } > > LOG.info("receive msg from broker, msg > received=" > + tm2.getText()); > > } > > Thread.sleep(1000); > > } catch (Exception e) { > LOG.error(e.getMessage(), e); > } finally { > try { > connection.close(); > } catch (Exception e) { > ; > } > } > } > > We noticed that when the default destination policy is set to > VMPendingDurableSubscriberMessageStoragePolicy, the above consume() > function > fails and stuck at the blocking call: > TextMessage tm2 = (TextMessage) ts.receive(); > > Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0 > > Thanks a lot! > > -Yi > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta
