Thanks for the reply.

I have disabled producer flow control on both topics and queues in my broker
configuration and I have message TTL specified on broker side.

The destination on which heartbeats are sent is a Queue on which 5
concurrent consumers are listening and the reply to destination is a topic
on which there are multiple subscribers.

When all this went down the topic destination enqueue count was not
increasing. 

I am pasting the code for consumer and producer:

*Producer:*

public boolean publish()
{
    String message = "Heartbeat message";
    boolean responseReceived = false;

    Connection connection = null;
    Session session = null;

    try
    {
      connection = myJmsTemplate.getConnectionFactory().createConnection();
      session    = connection.createSession(transacted, ackMode);

      String correlationId = null;
      Long   timeStamp     = System.currentTimeMillis();
      Random random        = new Random(timeStamp);

      Integer randomPart = random.nextInt(Integer.MAX_VALUE);
      Long    threadId   = Thread.currentThread().getId();
      correlationId      = threadId + "_" + timeStamp + "_" + randomPart;

      String messageSelector = "JMSCorrelationID='" + correlationId + "'";
      MessageConsumer responseConsumer =
session.createConsumer(receiveDestination, messageSelector);
      connection.start();

      // send a text message to broker
      myJmsTemplate.send(sendDestination, new
SimpleTextMessageCreator(message, receiveDestination, correlationId));

      LOG.debug("Waiting for message with " + messageSelector + " for " +
DEFAULT_TIMEOUT + " ms");

      // check for response from broker, DEFAULT_TIMEOUT is 60s.
     TextMessage responseMessage = (TextMessage)
responseConsumer.receive(DEFAULT_TIMEOUT);
     if (responseMessage != null)
     {
         if (!responseMessage.getJMSCorrelationID().equals(correlationId)) {
             String errorMsg =
                 "Invalid correlation id in response message!!! " +
                 "Expected : " + correlationId +
                 " but received : " + responseMessage.getJMSCorrelationID();

             LOG.error(errorMsg);
             responseReceived = false;
         }
         else {
             LOG.debug("Recieved the response back: " +
responseMessage.getText());
             LOG.debug("Correlation id of response message : " +
responseMessage.getJMSCorrelationID());
             responseReceived = true;
         }
     }
    }
    catch (JMSException e)
    {
     LOG.error("Error interacting with broker", e);
    }
    catch (Throwable t) {
     LOG.warn("Publish encountered unknown exception.", t);
    }
    finally {
     JmsUtil.closeConnection(connection, session,
this.getClass().getName());
    }
    return responseReceived;
}


*Listener/Consumer:*

public class HeartBeatListener implements SessionAwareMessageListener
{

     private final Log LOG = LogFactory.getLog(this.getClass());

     @Override
     public void onMessage(Message message, Session session) throws
JMSException
     {
         if (!(message instanceof TextMessage)) {
             throw new IllegalArgumentException("Message must be of type
TextMessage: " + message);
         }

         String replyTextMessage = "Heartbeat Ack.";

         try
         {
             TextMessage textMessage = (TextMessage) message;
             String msg = textMessage.getText();

             LOG.debug("Received heart beat message : " + msg);

             // Send the response to the destination specified by the
             // 'JMSReplyTo' field of the received message.
             Destination responseDest = message.getJMSReplyTo();
             if (responseDest != null)
             {
                 LOG.debug("Sending response to destination" +
responseDest.toString());

                 // Setup a message producer for the above destination
                 MessageProducer producer =
session.createProducer(responseDest);
                 TextMessage responseMessage =
session.createTextMessage(replyTextMessage);

                
responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());

                 // Send the response back
                 producer.send(responseMessage);
                 LOG.debug("Heart Beat Response Sent: " + responseMessage);
             }
         }
         catch (JMSException e)
         {
             LOG.error("Error while processing the message " + message, e);
         }
     }
}
This listener is used in a DMLC.

Hi art,
Here the session is created on the producer side (in the publish() function)
and on listener side a session aware listener is used. My question was that
what if the session on publisher side is closed before the session aware
listener is even called or is in process of executing onMessage()?

Please tell me if I am doing anything wrong here?

Thanks for all the help.
-Abhi




--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to