Hi Manish, Can you use a PooledActiveMQConnectionFactory from Jencks and use pooled connections rather than the ActiveMQConnectionFactory. Trying to use a single connection object for concurrent message writing to a JMS Queue is not a good idea.
Cheers, Ashwin... manish_goyal wrote: > > Hi, > > I am getting error while sending concurrent messages to ActiveMQ JMS > queue. My service is sending multiple message to ActiveMQ JMS queue > concurrently.When i m sending more then 3 times concurrently then only i > am getting this error. If i send message for only two times or less then, > it is not throwing any error. > I am getting the following error : - > > ERROR - TcpTransport - Could not stop service: > tcp://localhost > /127.0.0.1:61616. Reason: java.lang.InterruptedException > java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireShare > dNanos(AbstractQueuedSynchronizer.java:1165) > at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:245) > at > org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:459) > at > org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:63) > at > org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:469) > at > org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:113) > at > org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64) > at > org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:87) > at > org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64) > at > org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64) > at > org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:120) > at > org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40) > at > org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:592) > > This error is thrown while closing ActiveMQ connection. > > This is how i am send message to Queue:- > > public void sendToLogging(LogObject ephService) > { > Connection connection = null; > try { > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(user, password, url); > connection = connectionFactory.createConnection(); > connection.start(); > subject=EPHConfig.getProperty("LOG_QUEUE_NAME", > EPHConstants.COMMON_PROP_FILE_NAME); > Session session = connection.createSession(transacted, > Session.AUTO_ACKNOWLEDGE); > if (topic) { > destination = session.createTopic(subject); > } else { > destination = session.createQueue(subject); > } > > // Create the producer. > MessageProducer producer = session.createProducer(destination); > if (persistent) { > producer.setDeliveryMode(DeliveryMode.PERSISTENT); > } else { > producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); > } > if (timeToLive != 0) { > producer.setTimeToLive(timeToLive); > } > try{ > MessageFactory msgfact = MessageFactory.newInstance(); > SOAPMessage soapMessage = msgfact.createMessage(); > > soapMessage.setProperty(javax.xml.soap.SOAPMessage.CHARACTER_SET_ENCODING, > "UTF-8"); > SOAPPart soapPart = soapMessage.getSOAPPart(); > SOAPEnvelope envelope = soapPart.getEnvelope(); > > envelope.setEncodingStyle("http://schemas.xmlsoap.org/soap/encoding/"); > SOAPBody soapBody = (SOAPBody)envelope.getBody(); > JAXBContext jc = JAXBContext.newInstance( > "com.messages._1_0" ); > Marshaller marshaller = jc.createMarshaller(); > marshaller.setProperty( Marshaller.JAXB_FORMATTED_OUTPUT, > Boolean.TRUE ); > marshaller.marshal(ephService,soapBody); > soapMessage.saveChanges(); > soapMessage.writeTo(System.out); > ByteArrayOutputStream baos = new ByteArrayOutputStream(); > soapMessage.writeTo(baos); > TextMessage txtmsg = session.createTextMessage(); > txtmsg.setText(baos.toString()); > producer.send(txtmsg); > if (transacted) { > session.commit(); > } > } > catch(Exception e){ > sendToConsole(EPHConstants.MARSHALLING_ERR_MSG+e.getMessage()); > } > // Use the ActiveMQConnection interface to dump the connection > ActiveMQConnection actmqcon = > (ActiveMQConnection)connection; > } > catch (Exception e) { > sendToConsole(EPHConstants.ACTIVEMQ_ERR_MSG+e.getMessage()); > } > finally { > try { > connection.close(); > } > catch (Exception exp) { > sendToConsole(EPHConstants.CONNCLOSE_ERR_MSG+exp.getMessage()); > } > } > } > > Please give any pointers to solve this problem. > > Thanks. > > Manish > ----- --- Ashwin Karpe, Principal Consultant, PS - Opensource Center of Competence Progress Software Corporation 14 Oak Park Drive Bedford, MA 01730 --- +1-972-304-9084 (Office) +1-972-971-1700 (Mobile) ---- Blog: http://opensourceknowledge.blogspot.com/ -- View this message in context: http://www.nabble.com/problem-while-sending-concurrent-messages-to-ActiveMQ-Jms-Queue-tp21578653p21615385.html Sent from the ServiceMix - User mailing list archive at Nabble.com.