Below is the source for MessageConsumer.java
package ss.integration.util; import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnection; import javax.jms.Topic; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.MessageListener; import javax.jms.JMSException; import ss.util.logging.Logger; import ss.util.SystemException; import ss.util.JNDILookup; import ss.integration.util.ExListener; public class MessageConsumer { private TopicConnectionFactory _topicConnectionFactory; private TopicConnection _topicConnection; private Topic _topic; private TopicSession _session; private TopicSubscriber _subscriber; private String _strClientID; private String _strTopicConnectionFactory; private String _strTopic; private String _strSelector; private String _strContext; private String _strURL; private int _reconnectRetry; private long _reconnectTime; private boolean _firstTimeConnection; // flag to indicated that a shutdown has been requested private boolean _shutdownRequested; private static ExListener _exListener; private static MessageListener _msgListener; public MessageConsumer(String clientID_, String topicConnectionFactory_, String topic_, String selector_, String context_, String url_) throws SystemException { _strClientID = clientID_; _strTopicConnectionFactory = topicConnectionFactory_; _strTopic = topic_; _strSelector = selector_; _strContext = context_; _strURL = url_; _exListener = null; _msgListener = null; _reconnectRetry = 0; // reconnect is currently not used. _reconnectTime = 1 * 1000; _firstTimeConnection = true; _shutdownRequested = false; } public void init() throws SystemException { try { Logger.trace("MessageConsumer, setting the topicConnection, client: " + _strClientID); _topicConnectionFactory = (TopicConnectionFactory)JNDILookup.lookup(_strTopicConnectionFactory, _strContext, _strURL); _topicConnection = _topicConnectionFactory.createTopicConnection(); _topicConnection.setClientID(_strClientID); _topic = (Topic)JNDILookup.lookup(_strTopic, _strContext, _strURL); _session = _topicConnection.createTopicSession(true, 0); _subscriber = _session.createDurableSubscriber(_topic, _strClientID, _strSelector, true); Logger.trace("_topic=" + _topic + " _strClientID=" + _strClientID + " _strSelector = " + _strSelector); Logger.trace("_subscriber=" + _subscriber); if (_exListener == null) { _exListener = new ExListener(this); } } catch (JMSException je) { Logger.trace("MessageConsumer init() failed: JMSException"); throw new SystemException(je); } catch (SystemException se) { Logger.trace("MessageConsumer init() failed: SystemException"); throw new SystemException(se); } catch (Exception e) { Logger.trace("MessageConsumer init() failed: Exception"); throw new SystemException(e); } } public void start() throws SystemException { synchronized (this) { while (!_shutdownRequested) { try { startConnection(); Logger.trace("Wait()..." ); wait(); Logger.trace("Waiting thread awaken" ); } catch (Exception e) { Logger.error("Error in MessageConsumer.start: " + e.getMessage()); throw new SystemException(e); } finally { close(); } } } } private void signalShutdown() { Logger.trace("Shutting down..."); _shutdownRequested = true; notifyWaitingThread(); } // to signal a waiting thread (waiting inside start() method) public synchronized void notifyWaitingThread() { unregisterListener(); Logger.trace("Notifying/signaling a waiting thread..."); notify(); } public void setListener (MessageListener listener_) { _msgListener = listener_; } private void registerMsgListener() throws SystemException { if (_msgListener != null) { try { Logger.trace("Registering Message Listener..."); // install asynchronous listener _subscriber.setMessageListener(_msgListener); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } else { Logger.warning("MessageListener is null"); } } private void unregisterListener() { try { Logger.trace("Unregistering Message Listener..."); _subscriber.setMessageListener(null); } catch (Exception e) { Logger.error(e); } } private void registerExceptionListener() throws SystemException { if (_exListener != null) { try { Logger.trace("Registering JMS Exception Listener..."); // install asynchronous listener _topicConnection.setExceptionListener(_exListener); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } else { Logger.warning("ExceptionListener is null"); } } public void doCommit () throws SystemException { try { _session.commit(); } catch (JMSException je) { throw new SystemException(je); } catch (Exception e) { throw new SystemException(e); } } public void doRollback () throws SystemException { try { Logger.trace("doRollback() Rolling-back message..."); _session.rollback(); // Be careful when using this. Sometimes the rollback doesn't work. Logger.trace("doRollback() Message rolled-back"); signalShutdown(); } catch (JMSException je) { Logger.trace("doRollback() JMS Exception: " + je.getMessage()); throw new SystemException(je); } catch (Exception e) { Logger.trace("doRollback() Exception :" + e.getMessage()); throw new SystemException(e); } } public void close() throws SystemException { try { if (_subscriber != null) { Logger.trace("closing subscriber..."); _subscriber.close(); _subscriber = null; Logger.trace("subscriber closed"); } if (_session != null) { Logger.trace("closing session..."); _session.close(); _session = null; Logger.trace("session closed"); } if (_topicConnection != null) { Logger.trace("closing topic connection..."); _topicConnection.close(); _topicConnection = null; Logger.trace("topicConnection closed"); } _topicConnectionFactory = null; _topic = null; } catch (Exception e) { Logger.trace("Exception in close()"); } } private void startConnection() throws SystemException { boolean success = false; for (int i=0; i<_reconnectRetry || _firstTimeConnection; i++) { _firstTimeConnection = false; try { Logger.trace("Closing all connections... "); close(); if (_exListener != null) { wait(_reconnectTime); } Logger.trace("Starting all connections... (" + (i+1) + "/" + _reconnectRetry + ")"); init(); Logger.trace("Init successfull"); registerMsgListener(); Logger.trace("RegisterMsgListener successfull"); registerExceptionListener(); Logger.trace("RegisterExceptionListener successfull"); Logger.trace("Starting TopicConnection..."); _topicConnection.start(); Logger.trace("TopicConnection started..."); success = true; break; } catch (Exception e) { Logger.error("startConnection() failed"); Logger.error(e); } } if (!success) { close(); Logger.error("Unable to connect to the JMS Server"); throw new SystemException("Unable to connect to the JMS Server"); } } } James.Strachan wrote: > > The exception is being generated from the code in this package : > ss.integration.util > > I've no idea what that is - got the source? > > On 22/01/2008, activemqnewbie <[EMAIL PROTECTED]> wrote: >> >> We are getting NullpointerException and the consumer shuts off. >> Consumer after receiving certain number of messages consumer >> automatically >> closes throwing a null pointer exception. >> Activemq 5 >> Oracle10g >> >> Below is the stack trace. >> >> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener receives a >> JMS exception >> >> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering Message >> Listener... >> >> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling a >> waiting >> thread... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> Waiting thread awaken >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> closing subscriber... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> subscriber closed >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> closing session... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> session closed >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close() >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close() >> >> <ERROR><2008/01/22 09:52:56.470><20688146><> Unable to connect to the JMS >> Server >> >> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener receives a >> JMS exception >> >> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in >> MessageConsumer.start: >> Unable to connect to the JMS Server >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic connection... >> >> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in close() >> >> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering Message >> Listener... >> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in main: Unable to >> connect to the JMS Server >> >> >> <ERROR><2008/01/22 09:52:56.470><20688146><> StackTrace: >> >> <ERROR><2008/01/22 09:52:56.470><20688146><> >> Error Code: -1 >> k12.util.SystemException: Unable to connect to the JMS Server >> at >> ss.integration.util.MessageConsumer.startConnection(MessageConsumer.java:368) >> at >> ss.integration.util.MessageConsumer.start(MessageConsumer.java:124) >> at >> ss.integration.agent.sams.SInputStarter.start(SInputStarter.java:113) >> at >> ss.integration.agent.sams.SInputStarter.main(SInputStarter.java:45) >> >> <ERROR><2008/01/22 09:52:56.470><23459640><> >> java.lang.NullPointerException >> at >> ss.integration.util.MessageConsumer.unregisterListener(MessageConsumer.java:205) >> at >> ss.integration.util.MessageConsumer.notifyWaitingThread(MessageConsumer.java:159) >> at ss.integration.util.ExListener.onException(ExListener.java:26) >> at >> org.apache.activemq.ActiveMQConnection$3.run(ActiveMQConnection.java:1648) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675) >> at java.lang.Thread.run(Thread.java:595) >> >> >> >> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling a >> waiting >> thread... >> >> >> >> Thanks for the help, >> Vij >> >> >> >> >> -- >> View this message in context: >> http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15020252.html >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >> > > > -- > James > ------- > http://macstrac.blogspot.com/ > > Open Source Integration > http://open.iona.com > > -- View this message in context: http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15023059.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.