Seems like i keep having the issue even after increasing the limit. I wonder what does the 'WriteChecker" message really means. And why can't the connection remain stable? Is there a way to automatically recycle the connection to keep it "fresh"?
Any suggestions appreciated. On Mon, Dec 24, 2012 at 1:01 PM, Jose Martinez <jmarti...@opencrowd.com>wrote: > Thanks! > > I'll set it as follows: > broker.getSystemUsage().getTempUsage().setLimit(1024l*64l); > > And see how it goes. > > > > On Mon, Dec 24, 2012 at 12:03 PM, Raul Kripalani <r...@evosent.com> wrote: > >> You are setting a tempUsage limit of 2 kilobytes. Maybe you expected 2000 >> to mean megabytes, but the unit is bytes. See Javadoc [1]. >> >> In a non-persistent broker, the role of tempUsage is to buffer up >> non-persistent messages when consumers can't keep up. The temp storage >> implementation (PListStore) uses a default journal size of 32mb. So your >> system locks up as soon as it lazy inits the PListStore for the first >> time, >> because tempUsage < default journal file size. >> >> My two cents. Please let us know if this helped. >> >> Thanks, >> >> [1] >> >> http://activemq.apache.org/maven/5.7.0/activemq-core/apidocs/org/apache/activemq/usage/Usage.html#setLimit(long) >> . >> >> *Raúl Kripalani* >> Apache Camel Committer >> Enterprise Architect, Program Manager, Open Source Integration specialist >> http://about.me/raulkripalani | http://www.linkedin.com/in/raulkripalani >> http://blog.raulkr.net | twitter: @raulvk <http://twitter.com/raulvk> >> >> On Mon, Dec 24, 2012 at 4:50 PM, Jose Martinez <jmarti...@opencrowd.com >> >wrote: >> >> > Hello, >> > >> > I've implemented an embedded activemq service and a message listener >> using >> > in activemq 5.7.0. It works fine for a few hours but I've noticed that >> > after a day or two of inactivity messages stop getting consumed and I >> keep >> > getting the following message in the logs: >> > >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms >> > elapsed since last write check. >> > 16:19:01,627 DEBUG [AbstractInactivityMonitor] Running >> > WriteCheck[tcp://my.ip.addr:61616] >> > 16:19:11,624 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms >> > elapsed since last write check. >> > 16:19:11,625 DEBUG [AbstractInactivityMonitor] Running >> > WriteCheck[tcp://my.ip.addr:61616] >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] WriteChecker 10000 ms >> > elapsed since last write check. >> > 16:19:11,627 DEBUG [AbstractInactivityMonitor] Running >> > WriteCheck[tcp://my.ip.addr:61616] >> > >> > >> > It looks like my connection becomes inactive after awhile and messages >> get >> > stuck in the queue. I can see in netstat the several ports >> > remain occupied but messages are not dequeued. >> > >> > Here's what my embedded service looks like (implemented as a servlet in >> > tomcat): >> > >> > public void init(ServletConfig config) throws ServletException { >> > super.init(config); >> > logger.info("Initializing message queue listener"); >> > try { >> > BrokerService broker = new BrokerService(); >> > >> > TransportConnector connector = new TransportConnector(); >> > connector.setUri(new URI("tcp://localhost:61616")); >> > broker.addConnector(connector); >> > broker.setPersistent(false); >> > broker.getSystemUsage().getTempUsage().setLimit(2000); >> > broker.start(); >> > EntityXMLConsumer.startInstance(); /this is the listener >> > } catch (Exception e) { >> > logger.error(e); >> > } >> > } >> > >> > >> > And here's my listener. >> > >> > public class EntityXMLConsumer implements MessageListener, >> > ExceptionListener{ >> > private static Logger logger = >> Logger.getLogger(EntityXMLConsumer.class); >> > private static ConfigManager configManager = >> ConfigManager.getInstance(); >> > private static String url = >> > configManager.getProperty("popworkflowsvc.activemq.url"); >> > //private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; >> > private static String subject = "WORKFLOW"; >> > private static Session session; >> > private MessageConsumer consumer; >> > private static Connection connection; >> > private static EntityXMLConsumer instance = null; >> > >> > private EntityXMLConsumer() throws JMSException{ >> > logger.info("Initializing message listener on url :" + url); >> > ActiveMQConnectionFactory connectionFactory = new >> > ActiveMQConnectionFactory(url); >> > connection = connectionFactory.createConnection(); >> > // to use transactions you should set the first >> parameter to >> > 'true' >> > session = connection.createSession(false, >> > Session.AUTO_ACKNOWLEDGE); >> > Destination destination = session.createQueue(subject); >> > consumer = session.createConsumer(destination); >> > >> > consumer.setMessageListener(this); >> > connection.start(); >> > } >> > public static EntityXMLConsumer startInstance() throws JMSException { >> > if (instance == null){ >> > instance = new EntityXMLConsumer(); >> > } >> > return instance; >> > } >> > public static void closeConnection() { >> > try { >> > logger.info("closing connection"); >> > connection.close(); >> > } catch (JMSException e) { >> > logger.error(e); >> > } >> > } >> > >> > >> > public void onException(JMSException ex) { >> > logger.error(ex); >> > closeConnection(); >> > } >> > >> > >> > public void onMessage(Message message) { >> > final String MSG_ID = "ENTITY_DATA"; >> > PopWorkflowServiceSEI service = new PopWorkflowServiceImpl(); >> > >> > try { >> > logger.info("message received: " + message ); >> > if (message instanceof MapMessage){ >> > MapMessage mapMessage = (MapMessage) message; >> > Map payloadMap = (Map)mapMessage.getObject(MSG_ID); >> > for (Object messageKey: payloadMap.keySet()){ >> > logger.info("Module ID: " + messageKey); >> > Object[] entityXML = ((List)(payloadMap.get(messageKey))).toArray(); >> > String[] strEntityXML = Arrays.copyOf(entityXML, entityXML.length, >> > String[].class); >> > service.initWorkflow(strEntityXML , (String)messageKey); >> > logger.debug("Entity XML " + payloadMap.get(messageKey)); >> > } >> > } >> > } catch (JMSException e) { >> > logger.error(e); >> > closeConnection(); >> > } >> > } >> > } >> > >> > >> > So it looks like either the listener or the broker stop responding and >> > messages get stuck in the queue. Are there any additional parameters I >> need >> > to configure? Any pointers appreciated. >> > >> > >> > Thanks for your help. >> > >> > >