Here comes the attachments.

At 2013-06-16 16:07:42,lzr <jsw...@163.com> wrote:
>Dear all,
>
>
>I found a thread leak issue during try activemq 5.6:
>I create connection when sending message and close it once the sending 
>finished.
>Occassionally I restart activemq broker and found one thread generated with 
>name "ActiveMQ Connection Executor...".
>For deep research, I restarted the broker several times and same number thread 
>generated; I attached the picture of my debug.
>Could anybody give me some clues? I just want to know if this is a bug for 
>ActiveMQ. The attachments includes my applications and the debugging pictures.
>
>
>It would be much appreciated if you make any suggestion!!!
>
>
>Thanks in advance,
>Zhuran Li
>
>
>
>
>
>
>
>
>At 2013-06-14 23:03:55,"Christian Posta" <christian.po...@gmail.com> wrote:
>>Would need to be able to re-create this... Please put together a unit test
>>that reproduces this.
>>
>>
>>On Fri, Jun 14, 2013 at 1:00 AM, lzr <jsw...@163.com> wrote:
>>
>>> Dear all,
>>>
>>>
>>> I'm using Acitvemq 5.6 with my app and struggling to handle the following
>>> Exception:
>>> 2013-06-13 09:53:46,018 | WARN  | Send failed for: ActiveMQObjectMessage
>>> {commandId = 6, responseRequired = false, messageId =
>>> ID:nfsnnc02-46547-1370842535042-0:547519:1:1:1, originalDestination = null,
>>> originalTransactionId = null, producerId =
>>> ID:nfsnnc02-46547-1370842535042-0:547519:1:1, destination =
>>> queue://10408_base, transactionId =
>>> TX:ID:nfsnnc02-46547-1370842535042-0:547519:1, expiration = 0, timestamp =
>>> 1371088670829, arrival = 0, brokerInTime = 1371088426003, brokerOutTime =
>>> 0, correlationId = null, replyTo = null, persistent = true, type = null,
>>> priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
>>> compressed = false, userID = null, content =
>>> org.apache.activemq.util.ByteSequence@c4b67b, marshalledProperties =
>>> org.apache.activemq.util.ByteSequence@a84ac2, dataStructure = null,
>>> redeliveryCounter = 0, size = 0, properties = {createtime=2013-06-13
>>> 09:57:50, datasize=1, syncflag=bd_defdoc}, readOnlyProperties = false,
>>> readOnlyBody = false, droppable = fals
>>>  e},  missing producer state for:
>>> org.apache.activemq.broker.ProducerBrokerExchange@c44470 |
>>> org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///
>>> 10.1.100.66:64118
>>> 2013-06-13 09:53:46,018 | WARN  | Async error occurred:
>>> javax.jms.JMSException: Cannot send message to queue://10408_base with
>>> invalid (null) producer state |
>>> org.apache.activemq.broker.TransportConnection.Service | ActiveMQ
>>> Transport: tcp:///10.1.100.66:64118
>>> javax.jms.JMSException: Cannot send message to queue://10408_base with
>>> invalid (null) producer state
>>> at org.apache.activemq.broker.region.Queue.send(Queue.java:589)
>>> at
>>> org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:407)
>>> at
>>> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:503)
>>> at
>>> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:305)
>>> at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:129)
>>> at
>>> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
>>> at
>>> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:306)
>>> at
>>> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
>>> at
>>> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:453)
>>> at
>>> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
>>> at
>>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>>> at
>>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:150)
>>> at
>>> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
>>> at
>>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>>> at
>>> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229)
>>> at
>>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>>> at
>>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
>>> at
>>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
>>> at java.lang.Thread.run(Thread.java:619)
>>>
>>>
>>> Could anybody give me some clue? It would be much appreciated if any
>>> questions.
>>>
>>>
>>> Thanks a lot,
>>> Zhuran Li
>>
>>
>>
>>
>>-- 
>>*Christian Posta*
>>http://www.christianposta.com/blog
>>twitter: @christianposta
/**
 * 
 */
package amq.test;

import java.util.HashMap;

/**
 * @author liujju
 */
public class SendRun implements Runnable {

    /*
     * (non-Javadoc)
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {

        while (true) {
            try {
                new UFMQProcessor("192.168.183.225", "test", 
Integer.valueOf(60101)).sendSerializedObjMsg(
                    "HelloWorld",
                    new HashMap<String, String>());

            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                Thread.currentThread().sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        SendRun h1 = new SendRun();
        Thread demo = new Thread(h1);
        demo.start();

    }

}
package amq.test;


import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;

/**
 * MQ´¦ÀíÀ࣬·â×°Á˳£ÓÃÏûÏ¢¸ñʽµÄ·¢ËͺͽÓÊÕ(Ŀǰֻ֧³ÖP2P)
 */

public class UFMQProcessor {

    private static final int BUFFER_BYTE_LENGTH = 1024;

    // ¿Í»§¶ËMQ·þÎñÆ÷IP

    private String ip = null;

    // ¿Í»§¶Ë½ÓÊÕ¶ÓÁÐÃû

    private String queName = null;

    // ¿Í»§¶Ë½ÓMQ·þÎñÆ÷¶Ë¿Ú

    private Integer port = null;

    public void setIp(String ip) {

        this.ip = ip;

    }

    public void setQueName(String queName) {

        this.queName = queName;

    }

    public void setPort(Integer port) {

        this.port = port;

    }

    public String getIp() {

        return ip;

    }

    public String getQueName() {

        return queName;

    }

    public Integer getPort() {

        return port;

    }

    public UFMQProcessor(String ip,

                         String queName,

                         Integer port) {

        super();

        this.ip = ip;

        this.queName = queName;

        this.port = port;

    }

    /**
     * ·¢ËÍ¿ÉÐòÁл¯¶ÔÏóÏûÏ¢
     * 
     * @param data
     * @param queueName
     */

    public void sendSerializedObjMsg(Serializable data,

                                     Map<String, String> msgProperties) throws 
Exception {

        // ´´½¨Á¬½Ó

        ActiveMQConnection conn = null;
        long t1 = System.currentTimeMillis();
        try {

            System.out.println("start con:" + t1);
            conn = getConnection(getIp(), getPort());

            long t2 = System.currentTimeMillis();
            System.out.println("start con end:" + t2 + " spend:" + (t2 - t1));

//            Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//                              System.out.println("Begin to send message ...");
//            Queue queue = ss.createQueue(getQueName());
//            MessageProducer prod = ss.createProducer(queue);
//            ObjectMessage obejctmsg = ss.createObjectMessage(data);
//            setMsgPropertys(obejctmsg, msgProperties);
//            prod.send(obejctmsg);
//            prod.close();
//
//            ss.commit();
//            System.out.println("Message sent successfully.");

        } catch (Exception e) {

            System.out.println("Exception ...");
            long t2 = System.currentTimeMillis();
            System.out.println("error:" + t2 + " spend:" + (t2 - t1));

            throw new Exception(e.getMessage(), e);

        } finally {
            if (conn != null && !conn.isClosed()) {

                try {

                    System.out.println("finally close con....");
                    conn.close();
                    System.out.println("finally close con end....");

                } catch (JMSException e) {

                    System.out.println("finally close con exe....");
                    throw new Exception(e.getMessage(), e);

                }

            }

        }

    }

    protected void setMsgPropertys(ObjectMessage obejctmsg,

                                   Map<String, String> msgProperties) {

        try {

            for (Map.Entry<String, String> entry : msgProperties.entrySet()) {

                obejctmsg.setStringProperty(entry.getKey(), entry.getValue());

            }

        } catch (JMSException e) {

            System.out.println("ÏûÏ¢ÊôÐÔÉèÖÃʧ°Ü£¡");

        }

    }

    /**
     * ½«¶ÔÏóÒÔxmlÐÎʽ·¢ËÍ
     * 
     * @param data
     * @param queueName
     */

    public void sendObjToXmlMsg(Object data) throws Exception {

        // XStream stream = new XStream(new DomDriver());
        //
        // String xml = stream.toXML(data);
        //
        // sendTextMsg(xml);

    }

    /**
     * ½«¶ÔÏóÒÔjsonÐÎʽ·¢ËÍ
     * 
     * @param data
     * @param queueName
     */

    public void sendObjToJsonMsg(Object data) throws Exception {

        // XStream stream = new XStream(new JsonHierarchicalStreamDriver());
        //
        // String xml = stream.toXML(data);
        //
        // sendTextMsg(xml);

    }

    /**
     * ·¢ËÍÎı¾ÏûÏ¢
     * 
     * @param data
     * @param queueName
     */

    public void sendTextMsg(String data) throws Exception {

        // ´´½¨Á¬½Ó

        ActiveMQConnection conn = getConnection(getIp(), getPort());

        try {

            // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession

            Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

            // Á¬½Ó¶ÓÁÐ

            Queue queue = ss.createQueue(getQueName());

            MessageProducer prod = ss.createProducer(queue);

            // ·¢ËÍÊý¾Ý

            TextMessage txtmsg = ss.createTextMessage(data);

            prod.send(txtmsg);

            prod.close();

            // ÊÂÎñ´¦Àí

            ss.commit();

            System.out.println("ÏûÏ¢·¢Ëͳɹ¦");

        } catch (Exception e) {

            System.out.println("ÏûÏ¢·¢ËÍʧ°Ü");

            throw new Exception(e.getMessage());

        } finally {

            if (conn != null) {

                try {

                    conn.close();

                } catch (JMSException e) {

                    throw new Exception(e.getMessage());

                }

            }

        }

    }

    /**
     * ·¢ËÍ×Ö½ÚÏûÏ¢£¬ÓÃÓÚÎļþ´«Êä
     * 
     * @param sourcefile
     * @param queueName
     */

    public void sendFileMsg(String sourcefile) throws Exception {

        BufferedInputStream in = null;

        // ´´½¨Á¬½Ó

        ActiveMQConnection conn = getConnection(getIp(), getPort());

        try {

            // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession

            Session ss = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

            // Á¬½Ó¶ÓÁÐ

            Queue queue = ss.createQueue(getQueName());

            MessageProducer prod = ss.createProducer(queue);

            // ·¢ËÍÊý¾Ý

            BytesMessage bytesmsg = ss.createBytesMessage();

            in = new BufferedInputStream(new FileInputStream(sourcefile));

            byte[] buf = new byte[BUFFER_BYTE_LENGTH];

            int bytesread;

            while ((bytesread = in.read(buf)) != -1) {

                bytesmsg.writeBytes(buf, 0, bytesread);

            }

            prod.send(bytesmsg);

            prod.close();

            // ÊÂÎñ´¦Àí

            ss.commit();

            System.out.println("ÏûÏ¢·¢Ëͳɹ¦");

        } catch (Exception e) {

            System.out.println("ÏûÏ¢·¢ËÍʧ°Ü");

            throw new Exception(e.getMessage());

        } finally {

            if (in != null) {

                try {

                    in.close();

                } catch (IOException ioe) {

                    throw new Exception(ioe);

                }

            }

            if (conn != null) {

                try {

                    conn.close();

                } catch (JMSException e) {

                    throw new Exception(e.getMessage());

                }

            }

        }

    }

    /**
     * ½ÓÊÕÏûÏ¢
     * 
     * @return
     * @throws Exception
     */

    public Message recieveMsg() throws Exception {

        // ´´½¨Á¬½Ó

        ActiveMQConnection conn = getConnection(getIp(), getPort());

        try {

            // ¿ªÆôsession ´´½¨×Ô¼º¿ØÖÆÊÂÎñµÄsession

            ActiveMQSession queSession = (ActiveMQSession) 
conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

            Queue queue = queSession.createQueue(getQueName());

            ActiveMQMessageConsumer recv = (ActiveMQMessageConsumer) 
queSession.createConsumer(queue);

            System.out.println("¿ªÊ¼½ÓÊÕÏûÏ¢...");

            long startTime = System.currentTimeMillis();

            Message msg = recv.receive();

            queSession.commit();

            long endTime = System.currentTimeMillis();

            System.out.println("ÏûÏ¢½ÓÊÕÍê±Ï,¹²»¨Ê±: " + (endTime - startTime) 
+ " ms");

            return msg;

        } catch (JMSException e) {

            System.out.println("½ÓÊÕÏûÏ¢Òì³£");

            throw new Exception(e);

        } finally {

            try {

                conn.close();

            } catch (JMSException jme) {

                throw new Exception(jme);

            }

        }

    }

    /**
     * ½ÓÊÕÎı¾ÏûÏ¢
     * 
     * @return
     * @throws Exception
     */

    public String recieveStrMsg() throws Exception {

        TextMessage msg = (TextMessage) recieveMsg();

        try {

            return msg.getText();

        } catch (JMSException e) {

            throw new Exception(e);

        }

    }

    /**
     * ½ÓÊÕxmlÐÎʽ¶ÔÏó
     * 
     * @param data
     * @param queueName
     */

    public Object recieveXmlToObjMsg() throws Exception {
        //
        // String xml = recieveStrMsg();
        //
        // XStream stream = new XStream(new DomDriver());
        //
        // return stream.fromXML(xml);
        return null;

    }

    /**
     * ½ÓÊÕjsonÐÎʽ¶ÔÏó
     * 
     * @param data
     * @param queueName
     */

    public Object recieveJsonToObjMsg() throws Exception {
        //
        // String json = recieveStrMsg();
        //
        // XStream stream = new XStream(new JsonHierarchicalStreamDriver());
        //
        // return stream.fromXML(json);

        return null;
    }

    /**
     * ½ÓÊÕÐòÁл¯¶ÔÏó
     * 
     * @return
     * @throws Exception
     */

    public Serializable recieveSerializedObjMsg() throws Exception {

        ObjectMessage msg = (ObjectMessage) recieveMsg();

        try {

            return msg.getObject();

        } catch (JMSException e) {

            throw new Exception(e);

        }

    }

    /**
     * ½ÓÊÕÎļþ
     * 
     * @param resultfile
     * @return
     * @throws Exception
     */

    public File recieveFileMsg(String filepath) throws Exception {

        BytesMessage msg = (BytesMessage) recieveMsg();

        try {

            int rd;

            BufferedOutputStream fos = null;

            File resultFile = new File(filepath);

            try {

                fos = new BufferedOutputStream(new 
FileOutputStream(resultFile));

                byte[] buf = new byte[BUFFER_BYTE_LENGTH];

                while ((rd = msg.readBytes(buf)) != -1) {

                    fos.write(buf, 0, rd);

                }

                fos.flush();

            } catch (Exception ex) {

                throw new Exception(ex);

            } finally {

                try {

                    if (fos != null) {

                        fos.close();

                    }

                } catch (IOException io) {

                    throw new Exception(io);

                }

            }

            return resultFile;

        } catch (Exception e) {

            throw new Exception(e);

        }

    }

    /**
     * ·µ»ØMQÁ¬½Ó
     * 
     * @return
     * @throws JMSException
     */

    public ActiveMQConnection getConnection(String ip,

                                            Integer port) throws Exception {

        try {

            if (ip == null || ("").equals(ip) || port == null) {

                throw new JMSException("IP»ò¶Ë¿Ú²»ÄÜΪ¿Õ£¡£¡");

            }

            return getConnection(Thread.currentThread().getName(), ip, port);

        } catch (Exception e) {

            throw new Exception(e.getMessage());

        }

    }

    /**
     * ´´½¨MQÁ¬½Ó
     * 
     * @param cid
     * @return
     * @throws JMSException
     */

    public ActiveMQConnection getConnection(String cid,

                                            String ip,

                                            Integer port) throws JMSException {

        ActiveMQConnectionFactory cf =

        new ActiveMQConnectionFactory("tcp://" + ip + ":" + port + 
"?wireFormat.maxInactivityDuration=90000");

        cf.getPrefetchPolicy().setQueuePrefetch(1);

        ActiveMQConnection conn = (ActiveMQConnection) cf.createConnection();

        conn.start();

        return conn;

    }

}

Reply via email to