Here comes the attachments. At 2013-06-16 16:07:42,lzr <jsw...@163.com> wrote: >Dear all, > > >I found a thread leak issue during try activemq 5.6: >I create connection when sending message and close it once the sending >finished. >Occassionally I restart activemq broker and found one thread generated with >name "ActiveMQ Connection Executor...". >For deep research, I restarted the broker several times and same number thread >generated; I attached the picture of my debug. >Could anybody give me some clues? I just want to know if this is a bug for >ActiveMQ. The attachments includes my applications and the debugging pictures. > > >It would be much appreciated if you make any suggestion!!! > > >Thanks in advance, >Zhuran Li > > > > > > > > >At 2013-06-14 23:03:55,"Christian Posta" <christian.po...@gmail.com> wrote: >>Would need to be able to re-create this... Please put together a unit test >>that reproduces this. >> >> >>On Fri, Jun 14, 2013 at 1:00 AM, lzr <jsw...@163.com> wrote: >> >>> Dear all, >>> >>> >>> I'm using Acitvemq 5.6 with my app and struggling to handle the following >>> Exception: >>> 2013-06-13 09:53:46,018 | WARN | Send failed for: ActiveMQObjectMessage >>> {commandId = 6, responseRequired = false, messageId = >>> ID:nfsnnc02-46547-1370842535042-0:547519:1:1:1, originalDestination = null, >>> originalTransactionId = null, producerId = >>> ID:nfsnnc02-46547-1370842535042-0:547519:1:1, destination = >>> queue://10408_base, transactionId = >>> TX:ID:nfsnnc02-46547-1370842535042-0:547519:1, expiration = 0, timestamp = >>> 1371088670829, arrival = 0, brokerInTime = 1371088426003, brokerOutTime = >>> 0, correlationId = null, replyTo = null, persistent = true, type = null, >>> priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, >>> compressed = false, userID = null, content = >>> org.apache.activemq.util.ByteSequence@c4b67b, marshalledProperties = >>> org.apache.activemq.util.ByteSequence@a84ac2, dataStructure = null, >>> redeliveryCounter = 0, size = 0, properties = {createtime=2013-06-13 >>> 09:57:50, datasize=1, syncflag=bd_defdoc}, readOnlyProperties = false, >>> readOnlyBody = false, droppable = fals >>> e}, missing producer state for: >>> org.apache.activemq.broker.ProducerBrokerExchange@c44470 | >>> org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:/// >>> 10.1.100.66:64118 >>> 2013-06-13 09:53:46,018 | WARN | Async error occurred: >>> javax.jms.JMSException: Cannot send message to queue://10408_base with >>> invalid (null) producer state | >>> org.apache.activemq.broker.TransportConnection.Service | ActiveMQ >>> Transport: tcp:///10.1.100.66:64118 >>> javax.jms.JMSException: Cannot send message to queue://10408_base with >>> invalid (null) producer state >>> at org.apache.activemq.broker.region.Queue.send(Queue.java:589) >>> at >>> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:407) >>> at >>> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:503) >>> at >>> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:305) >>> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129) >>> at >>> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96) >>> at >>> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:306) >>> at >>> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135) >>> at >>> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:453) >>> at >>> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681) >>> at >>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292) >>> at >>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:150) >>> at >>> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) >>> at >>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) >>> at >>> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229) >>> at >>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) >>> at >>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222) >>> at >>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) >>> at java.lang.Thread.run(Thread.java:619) >>> >>> >>> Could anybody give me some clue? It would be much appreciated if any >>> questions. >>> >>> >>> Thanks a lot, >>> Zhuran Li >> >> >> >> >>-- >>*Christian Posta* >>http://www.christianposta.com/blog >>twitter: @christianposta
/** * */ package amq.test;
import java.util.HashMap; /** * @author liujju */ public class SendRun implements Runnable { /* * (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public void run() { while (true) { try { new UFMQProcessor("192.168.183.225", "test", Integer.valueOf(60101)).sendSerializedObjMsg( "HelloWorld", new HashMap<String, String>()); } catch (Exception e) { e.printStackTrace(); } try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { SendRun h1 = new SendRun(); Thread demo = new Thread(h1); demo.start(); } }
package amq.test; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.Map; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQSession; /** * MQ´¦ÀíÀ࣬·â×°Á˳£ÓÃÏûÏ¢¸ñʽµÄ·¢ËͺͽÓÊÕ(Ŀǰֻ֧³ÖP2P) */ public class UFMQProcessor { private static final int BUFFER_BYTE_LENGTH = 1024; // ¿Í»§¶ËMQ·þÎñÆ÷IP private String ip = null; // ¿Í»§¶Ë½ÓÊÕ¶ÓÁÐÃû private String queName = null; // ¿Í»§¶Ë½ÓMQ·þÎñÆ÷¶Ë¿Ú private Integer port = null; public void setIp(String ip) { this.ip = ip; } public void setQueName(String queName) { this.queName = queName; } public void setPort(Integer port) { this.port = port; } public String getIp() { return ip; } public String getQueName() { return queName; } public Integer getPort() { return port; } public UFMQProcessor(String ip, String queName, Integer port) { super(); this.ip = ip; this.queName = queName; this.port = port; } /** * ·¢ËÍ¿ÉÐòÁл¯¶ÔÏóÏûÏ¢ * * @param data * @param queueName */ public void sendSerializedObjMsg(Serializable data, Map<String, String> msgProperties) throws Exception { // ´´½¨Á¬½Ó ActiveMQConnection conn = null; long t1 = System.currentTimeMillis(); try { System.out.println("start con:" + t1); conn = getConnection(getIp(), getPort()); long t2 = System.currentTimeMillis(); System.out.println("start con end:" + t2 + " spend:" + (t2 - t1)); // Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // System.out.println("Begin to send message ..."); // Queue queue = ss.createQueue(getQueName()); // MessageProducer prod = ss.createProducer(queue); // ObjectMessage obejctmsg = ss.createObjectMessage(data); // setMsgPropertys(obejctmsg, msgProperties); // prod.send(obejctmsg); // prod.close(); // // ss.commit(); // System.out.println("Message sent successfully."); } catch (Exception e) { System.out.println("Exception ..."); long t2 = System.currentTimeMillis(); System.out.println("error:" + t2 + " spend:" + (t2 - t1)); throw new Exception(e.getMessage(), e); } finally { if (conn != null && !conn.isClosed()) { try { System.out.println("finally close con...."); conn.close(); System.out.println("finally close con end...."); } catch (JMSException e) { System.out.println("finally close con exe...."); throw new Exception(e.getMessage(), e); } } } } protected void setMsgPropertys(ObjectMessage obejctmsg, Map<String, String> msgProperties) { try { for (Map.Entry<String, String> entry : msgProperties.entrySet()) { obejctmsg.setStringProperty(entry.getKey(), entry.getValue()); } } catch (JMSException e) { System.out.println("ÏûÏ¢ÊôÐÔÉèÖÃʧ°Ü£¡"); } } /** * ½«¶ÔÏóÒÔxmlÐÎʽ·¢ËÍ * * @param data * @param queueName */ public void sendObjToXmlMsg(Object data) throws Exception { // XStream stream = new XStream(new DomDriver()); // // String xml = stream.toXML(data); // // sendTextMsg(xml); } /** * ½«¶ÔÏóÒÔjsonÐÎʽ·¢ËÍ * * @param data * @param queueName */ public void sendObjToJsonMsg(Object data) throws Exception { // XStream stream = new XStream(new JsonHierarchicalStreamDriver()); // // String xml = stream.toXML(data); // // sendTextMsg(xml); } /** * ·¢ËÍÎı¾ÏûÏ¢ * * @param data * @param queueName */ public void sendTextMsg(String data) throws Exception { // ´´½¨Á¬½Ó ActiveMQConnection conn = getConnection(getIp(), getPort()); try { // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // Á¬½Ó¶ÓÁÐ Queue queue = ss.createQueue(getQueName()); MessageProducer prod = ss.createProducer(queue); // ·¢ËÍÊý¾Ý TextMessage txtmsg = ss.createTextMessage(data); prod.send(txtmsg); prod.close(); // ÊÂÎñ´¦Àí ss.commit(); System.out.println("ÏûÏ¢·¢Ëͳɹ¦"); } catch (Exception e) { System.out.println("ÏûÏ¢·¢ËÍʧ°Ü"); throw new Exception(e.getMessage()); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { throw new Exception(e.getMessage()); } } } } /** * ·¢ËÍ×Ö½ÚÏûÏ¢£¬ÓÃÓÚÎļþ´«Êä * * @param sourcefile * @param queueName */ public void sendFileMsg(String sourcefile) throws Exception { BufferedInputStream in = null; // ´´½¨Á¬½Ó ActiveMQConnection conn = getConnection(getIp(), getPort()); try { // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // Á¬½Ó¶ÓÁÐ Queue queue = ss.createQueue(getQueName()); MessageProducer prod = ss.createProducer(queue); // ·¢ËÍÊý¾Ý BytesMessage bytesmsg = ss.createBytesMessage(); in = new BufferedInputStream(new FileInputStream(sourcefile)); byte[] buf = new byte[BUFFER_BYTE_LENGTH]; int bytesread; while ((bytesread = in.read(buf)) != -1) { bytesmsg.writeBytes(buf, 0, bytesread); } prod.send(bytesmsg); prod.close(); // ÊÂÎñ´¦Àí ss.commit(); System.out.println("ÏûÏ¢·¢Ëͳɹ¦"); } catch (Exception e) { System.out.println("ÏûÏ¢·¢ËÍʧ°Ü"); throw new Exception(e.getMessage()); } finally { if (in != null) { try { in.close(); } catch (IOException ioe) { throw new Exception(ioe); } } if (conn != null) { try { conn.close(); } catch (JMSException e) { throw new Exception(e.getMessage()); } } } } /** * ½ÓÊÕÏûÏ¢ * * @return * @throws Exception */ public Message recieveMsg() throws Exception { // ´´½¨Á¬½Ó ActiveMQConnection conn = getConnection(getIp(), getPort()); try { // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession ActiveMQSession queSession = (ActiveMQSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = queSession.createQueue(getQueName()); ActiveMQMessageConsumer recv = (ActiveMQMessageConsumer) queSession.createConsumer(queue); System.out.println("¿ªÊ¼½ÓÊÕÏûÏ¢..."); long startTime = System.currentTimeMillis(); Message msg = recv.receive(); queSession.commit(); long endTime = System.currentTimeMillis(); System.out.println("ÏûÏ¢½ÓÊÕÍê±Ï,¹²»¨Ê±: " + (endTime - startTime) + " ms"); return msg; } catch (JMSException e) { System.out.println("½ÓÊÕÏûÏ¢Òì³£"); throw new Exception(e); } finally { try { conn.close(); } catch (JMSException jme) { throw new Exception(jme); } } } /** * ½ÓÊÕÎı¾ÏûÏ¢ * * @return * @throws Exception */ public String recieveStrMsg() throws Exception { TextMessage msg = (TextMessage) recieveMsg(); try { return msg.getText(); } catch (JMSException e) { throw new Exception(e); } } /** * ½ÓÊÕxmlÐÎʽ¶ÔÏó * * @param data * @param queueName */ public Object recieveXmlToObjMsg() throws Exception { // // String xml = recieveStrMsg(); // // XStream stream = new XStream(new DomDriver()); // // return stream.fromXML(xml); return null; } /** * ½ÓÊÕjsonÐÎʽ¶ÔÏó * * @param data * @param queueName */ public Object recieveJsonToObjMsg() throws Exception { // // String json = recieveStrMsg(); // // XStream stream = new XStream(new JsonHierarchicalStreamDriver()); // // return stream.fromXML(json); return null; } /** * ½ÓÊÕÐòÁл¯¶ÔÏó * * @return * @throws Exception */ public Serializable recieveSerializedObjMsg() throws Exception { ObjectMessage msg = (ObjectMessage) recieveMsg(); try { return msg.getObject(); } catch (JMSException e) { throw new Exception(e); } } /** * ½ÓÊÕÎļþ * * @param resultfile * @return * @throws Exception */ public File recieveFileMsg(String filepath) throws Exception { BytesMessage msg = (BytesMessage) recieveMsg(); try { int rd; BufferedOutputStream fos = null; File resultFile = new File(filepath); try { fos = new BufferedOutputStream(new FileOutputStream(resultFile)); byte[] buf = new byte[BUFFER_BYTE_LENGTH]; while ((rd = msg.readBytes(buf)) != -1) { fos.write(buf, 0, rd); } fos.flush(); } catch (Exception ex) { throw new Exception(ex); } finally { try { if (fos != null) { fos.close(); } } catch (IOException io) { throw new Exception(io); } } return resultFile; } catch (Exception e) { throw new Exception(e); } } /** * ·µ»ØMQÁ¬½Ó * * @return * @throws JMSException */ public ActiveMQConnection getConnection(String ip, Integer port) throws Exception { try { if (ip == null || ("").equals(ip) || port == null) { throw new JMSException("IP»ò¶Ë¿Ú²»ÄÜΪ¿Õ£¡£¡"); } return getConnection(Thread.currentThread().getName(), ip, port); } catch (Exception e) { throw new Exception(e.getMessage()); } } /** * ´´½¨MQÁ¬½Ó * * @param cid * @return * @throws JMSException */ public ActiveMQConnection getConnection(String cid, String ip, Integer port) throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://" + ip + ":" + port + "?wireFormat.maxInactivityDuration=90000"); cf.getPrefetchPolicy().setQueuePrefetch(1); ActiveMQConnection conn = (ActiveMQConnection) cf.createConnection(); conn.start(); return conn; } }