I was using 'PERSISTENT' mode on publisher side, which causes the dead loop
of 'SHUTDOWN message not acked --> redelivery of SHUTDOWN message upon
subscriber start'.



eaglepointe wrote:
> 
> Yes, I'm running 5.1.0 and using default activemq.xml. I've found out the
> cause! It is a bug subscriber code, it closes the connection in
> 'onMessage' upon receiving 'SHUTDOWN' message while using
> 'session.AUTO_ACKNOWLEDGE', so 'SHUTDOWN' message never gets acknowledged,
> which results in everybody appearing weird across
> subscriber/publisher/broker restart. Changing to
> 'session.CLIENT_ACKNOWLEDGE' solved the problem.
> 
> Thanks for the help!
> 
> 
> Dave Stanley wrote:
>> 
>> Not that it helps much, I tried it with a 5.x release and your code works
>> fine. I see the subscription is durable across consumer and broker
>> restarts.
>> Have you modified your activemq.xml, are you running with 5.1.0?
>> 
>> /Dave
>> On Fri, Jun 13, 2008 at 1:05 PM, eaglepointe <[EMAIL PROTECTED]>
>> wrote:
>> 
>>>
>>> Forgot to mention that I also restarted message broker before start
>>> durable
>>> subscriber again.
>>>
>>>
>>> eaglepointe wrote:
>>> >
>>> > Ok, here is what I did,
>>> >
>>> > 1) start message broker
>>> > 2) start a durable subscriber which subscribes "topictest.messages" as
>>> the
>>> > attached code
>>> > 3) start publisher, which sends some messages to "topictest.messages"
>>> in
>>> > 'persistent' mode, and the subscriber receives the messages
>>> > 4) stop the subscriber and publisher
>>> > 5) start publisher again, which sends some messages to
>>> > "topictest.messages"
>>> > 6) stop publisher
>>> > 7) start subscriber again, according to my understanding of durable
>>> > subscription, I'd expect subscriber receives the messages sent by
>>> > publisher, but I got the exceptions instead
>>> >
>>> > Following is a more complete version of subscriber code, which is
>>> really
>>> > the sample code comes with ActiveMQ installation, except for the
>>> durable
>>> > subscription.
>>> >
>>> > The same code works fine, meaning no exceptions, if I don't use
>>> durable
>>> > subscription.
>>> > It seems that durable subscription requires some context in message
>>> broker
>>> > which somehow got lost.
>>> >
>>> >
>>> >
>>> --------------------------------------------------------------------------------------------
>>> >
>>> > public class TopicListener implements MessageListener {
>>> >
>>> >     private Connection connection;
>>> >     private MessageProducer producer;
>>> >     private Session session;
>>> >     private int count;
>>> >     private long start;
>>> >     private Topic topic;
>>> >     private Topic control;
>>> >     //private TopicSubscriber consumer;
>>> >
>>> >     private String url = "tcp://192.168.30.60:61616";
>>> >
>>> >     public static void main(String[] argv) throws Exception {
>>> >         TopicListener l = new TopicListener();
>>> >         String[] unknown = CommandLineSupport.setOptions(l, argv);
>>> >         if (unknown.length > 0) {
>>> >             System.out.println("Unknown options: " +
>>> > Arrays.toString(unknown));
>>> >             System.exit(-1);
>>> >         }
>>> >         l.run();
>>> >     }
>>> >
>>> >     public void run() throws JMSException {
>>> >         ActiveMQConnectionFactory factory = new
>>> > ActiveMQConnectionFactory(url);
>>> >         connection = factory.createConnection();
>>> >         connection.setClientID("myClient");
>>> >         session = connection.createSession(false,
>>> > Session.AUTO_ACKNOWLEDGE);
>>> >         topic = session.createTopic("topictest.messages");
>>> >         control = session.createTopic("topictest.control");
>>> >
>>> >         TopicSubscriber consumer =
>>> > session.createDurableSubscriber(topic,"myDurable1");
>>> >         //MessageConsumer consumer = session.createConsumer(topic);
>>> >         consumer.setMessageListener(this);
>>> >
>>> >         producer = session.createProducer(control);
>>> >
>>> >         System.out.println("Waiting for messages...");
>>> >         connection.start();
>>> >     }
>>> >
>>> ---------------------------------------------------------------------------------
>>> >
>>> > your grateful,
>>> > -Jeff
>>> >
>>> >
>>> >
>>> > Dave Stanley wrote:
>>> >>
>>> >> Your not running this from within a thread by any chance? I don't
>>> >> believe connection.start() is going to block, so try and create the
>>> >> connection outside the thread and pass it in by reference to the
>>> thread
>>> >> instance.
>>> >>
>>> >> HTH
>>> >> /Dave
>>> >>
>>> >>
>>> >> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <[EMAIL PROTECTED]>
>>> >> wrote:
>>> >>
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm new to ActiveMQ and having problem with durable subscription,
>>> >>> following
>>> >>> code based on example code,
>>> >>>
>>> >>> ===================================================================
>>> >>>   public void run() throws JMSException {
>>> >>>        ActiveMQConnectionFactory factory = new
>>> >>> ActiveMQConnectionFactory(url);
>>> >>>        connection = factory.createConnection();
>>> >>>        connection.setClientID("myClient");
>>> >>>        session = connection.createSession(false,
>>> >>> Session.AUTO_ACKNOWLEDGE);
>>> >>>        topic = session.createTopic("topictest.messages");
>>> >>>        control = session.createTopic("topictest.control");
>>> >>>
>>> >>>        TopicSubscriber consumer =
>>> >>> session.createDurableSubscriber(topic,"myDurable1");
>>> >>>        //MessageConsumer consumer = session.createConsumer(topic);
>>> >>>        consumer.setMessageListener(this);
>>> >>>
>>> >>>        producer = session.createProducer(control);
>>> >>>
>>> >>>        System.out.println("Waiting for messages...");
>>> >>>        connection.start();
>>> >>>    }
>>> >>> =================================================================
>>> >>>
>>> >>> is causing exceptions on broker side and client side, below are the
>>> >>> exceptions
>>> >>>
>>> >>> broker side:
>>> >>>
>>> >>> ERROR Service                        - Async error occurred:
>>> >>> java.lang.NullPointerException
>>> >>> java.lang.NullPointerException
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479)
>>> >>>        at
>>> >>>
>>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>> >>>        at
>>> >>>
>>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
>>> >>>        at
>>> >>>
>>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
>>> >>>        at java.lang.Thread.run(Thread.java:619)
>>> >>>
>>> >>> client side:
>>> >>>
>>> >>> Exception in thread "ActiveMQ Session Task"
>>> >>> java.util.concurrent.RejectedExecutionException
>>> >>>        at
>>> >>>
>>> >>>
>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown
>>> >>> Source)
>>> >>>        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown
>>> Source)
>>> >>>        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown
>>> >>> Source)
>>> >>>        at
>>> >>>
>>> >>>
>>> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144)
>>> >>>        at
>>> >>>
>>> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>>> >>>        at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
>>> >>> Source)
>>> >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>>> >>> Source)
>>> >>>        at java.lang.Thread.run(Unknown Source)
>>> >>>
>>> >>>
>>> >>> Any idea? Thanks in advance!
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> >>>
>>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html
>>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >>>
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>>
>>> --
>>> View this message in context:
>>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html
>>>  Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17833849.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to