I can't speak to the question about closing the session (I use Camel to interact with ActiveMQ, so I don't directly deal with the session), but even if that explains the increasing msgsHeld value, it doesn't explain the producer slowing down and eventually stopping its sends. So there might be multiple problems here.
What code calls your publish() method periodically? And can you find out (e.g. by taking the thread dump Art referenced) whether you're blocked on the send() call or somewhere else when it all comes to a halt? Also, I was under the impression that it wasn't possible to set message TTL on the broker and that it could only be set by the producer; can you provide a reference to the documentation for the setting you're using? On Jan 13, 2015 1:03 AM, "xabhi" <xabh...@gmail.com> wrote: > Thanks for the reply. > > I have disabled producer flow control on both topics and queues in my > broker > configuration and I have message TTL specified on broker side. > > The destination on which heartbeats are sent is a Queue on which 5 > concurrent consumers are listening and the reply to destination is a topic > on which there are multiple subscribers. > > When all this went down the topic destination enqueue count was not > increasing. > > I am pasting the code for consumer and producer: > > *Producer:* > > public boolean publish() > { > String message = "Heartbeat message"; > boolean responseReceived = false; > > Connection connection = null; > Session session = null; > > try > { > connection = myJmsTemplate.getConnectionFactory().createConnection(); > session = connection.createSession(transacted, ackMode); > > String correlationId = null; > Long timeStamp = System.currentTimeMillis(); > Random random = new Random(timeStamp); > > Integer randomPart = random.nextInt(Integer.MAX_VALUE); > Long threadId = Thread.currentThread().getId(); > correlationId = threadId + "_" + timeStamp + "_" + randomPart; > > String messageSelector = "JMSCorrelationID='" + correlationId + "'"; > MessageConsumer responseConsumer = > session.createConsumer(receiveDestination, messageSelector); > connection.start(); > > // send a text message to broker > myJmsTemplate.send(sendDestination, new > SimpleTextMessageCreator(message, receiveDestination, correlationId)); > > LOG.debug("Waiting for message with " + messageSelector + " for " + > DEFAULT_TIMEOUT + " ms"); > > // check for response from broker, DEFAULT_TIMEOUT is 60s. > TextMessage responseMessage = (TextMessage) > responseConsumer.receive(DEFAULT_TIMEOUT); > if (responseMessage != null) > { > if (!responseMessage.getJMSCorrelationID().equals(correlationId)) > { > String errorMsg = > "Invalid correlation id in response message!!! " + > "Expected : " + correlationId + > " but received : " + > responseMessage.getJMSCorrelationID(); > > LOG.error(errorMsg); > responseReceived = false; > } > else { > LOG.debug("Recieved the response back: " + > responseMessage.getText()); > LOG.debug("Correlation id of response message : " + > responseMessage.getJMSCorrelationID()); > responseReceived = true; > } > } > } > catch (JMSException e) > { > LOG.error("Error interacting with broker", e); > } > catch (Throwable t) { > LOG.warn("Publish encountered unknown exception.", t); > } > finally { > JmsUtil.closeConnection(connection, session, > this.getClass().getName()); > } > return responseReceived; > } > > > *Listener/Consumer:* > > public class HeartBeatListener implements SessionAwareMessageListener > { > > private final Log LOG = LogFactory.getLog(this.getClass()); > > @Override > public void onMessage(Message message, Session session) throws > JMSException > { > if (!(message instanceof TextMessage)) { > throw new IllegalArgumentException("Message must be of type > TextMessage: " + message); > } > > String replyTextMessage = "Heartbeat Ack."; > > try > { > TextMessage textMessage = (TextMessage) message; > String msg = textMessage.getText(); > > LOG.debug("Received heart beat message : " + msg); > > // Send the response to the destination specified by the > // 'JMSReplyTo' field of the received message. > Destination responseDest = message.getJMSReplyTo(); > if (responseDest != null) > { > LOG.debug("Sending response to destination" + > responseDest.toString()); > > // Setup a message producer for the above destination > MessageProducer producer = > session.createProducer(responseDest); > TextMessage responseMessage = > session.createTextMessage(replyTextMessage); > > > responseMessage.setJMSCorrelationID(message.getJMSCorrelationID()); > > // Send the response back > producer.send(responseMessage); > LOG.debug("Heart Beat Response Sent: " + responseMessage); > } > } > catch (JMSException e) > { > LOG.error("Error while processing the message " + message, e); > } > } > } > This listener is used in a DMLC. > > Hi art, > Here the session is created on the producer side (in the publish() > function) > and on listener side a session aware listener is used. My question was that > what if the session on publisher side is closed before the session aware > listener is even called or is in process of executing onMessage()? > > Please tell me if I am doing anything wrong here? > > Thanks for all the help. > -Abhi > > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Consumer-not-able-to-consume-messages-from-queue-tp4689594p4689835.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. >