Bug in JBoss MQ
---------------

         Key: JBMQ-4
         URL: http://jira.jboss.com/jira/browse/JBMQ-4
     Project: JBoss MQ
        Type: Bug
    Versions:  JBossAS-3.2.5    
 Environment: OS - SunOS, 5.8, Java Version - 1.4.2, VM - Java Hotspot
    Reporter: Kalyan
 Assigned to: Adrian Brock 
    Priority: Critical


Hi, 

We were running a loadtest on a single instance of JBoss. The following thread 
on JBossMQ forum details about the test that's been conducted: 
http://www.jboss.org/index.html?module=bb&op=viewtopic&t=57189
The server has come to unusable state in less than 30 mins.

We have identified that there is a deadlock on MessageCache's LRUCache from the 
thread dump. We have found several instances of the below dump.

"Thread-11056" daemon prio=5 tid=0x014d8ff0 nid=0x3d17 waiting for monitor 
entry [5e801000..5e8019c0]
        at 
org.jboss.mq.server.MessageCache.validateSoftReferenceDepth(MessageCache.java:327)
        - waiting to lock <0xeddc9268> (a org.jboss.mq.server.MessageReference)
        at org.jboss.mq.server.MessageCache.addInternal(MessageCache.java:142)
        at org.jboss.mq.server.MessageCache.add(MessageCache.java:112)
        at org.jboss.mq.server.JMSQueue.addMessage(JMSQueue.java:130)
        at 
org.jboss.mq.server.JMSDestinationManager.addMessage(JMSDestinationManager.java:402)
        at 
org.jboss.mq.server.JMSDestinationManager.addMessage(JMSDestinationManager.java:378)
        at 
org.jboss.mq.server.JMSServerInvoker.addMessage(JMSServerInvoker.java:136)
        at 
org.jboss.mq.il.uil2.ServerSocketManagerHandler.handleMsg(ServerSocketManagerHandler.java:86)
        at 
org.jboss.mq.il.uil2.SocketManager$ReadTask.handleMsg(SocketManager.java:356)
        at org.jboss.mq.il.uil2.msgs.BaseMsg.run(BaseMsg.java:377)
        at 
EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExecutor.java:748)
        at java.lang.Thread.run(Thread.java:534)

There were few instances of the below dump.

"Thread-10996" daemon prio=5 tid=0x01145a08 nid=0x3cc8 waiting for monitor 
entry [60201000..602019c0]
        at 
org.jboss.mq.server.MessageCache.validateSoftReferenceDepth(MessageCache.java:272)
        - waiting to lock <0xebdd8c78> (a 
org.jboss.mq.server.MessageCache$LRUCache)
        at org.jboss.mq.server.MessageCache.addInternal(MessageCache.java:142)
        at org.jboss.mq.server.MessageCache.add(MessageCache.java:112)
        at org.jboss.mq.server.JMSQueue.addMessage(JMSQueue.java:130)
        at 
org.jboss.mq.server.JMSDestinationManager.addMessage(JMSDestinationManager.java:402)
        at 
org.jboss.mq.server.JMSDestinationManager.addMessage(JMSDestinationManager.java:378)
        at 
org.jboss.mq.server.JMSServerInvoker.addMessage(JMSServerInvoker.java:136)
        at 
org.jboss.mq.il.uil2.ServerSocketManagerHandler.handleMsg(ServerSocketManagerHandler.java:86)
        at 
org.jboss.mq.il.uil2.SocketManager$ReadTask.handleMsg(SocketManager.java:356)
        at org.jboss.mq.il.uil2.msgs.BaseMsg.run(BaseMsg.java:377)
        at 
EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExecutor.java:748)
        at java.lang.Thread.run(Thread.java:534)

We have found that the deadlock is happening in the method 
validateSoftReferenceDepth of MessageCache and have commented the calls to that 
method and ran the server again with the same configuration. This time the 
server was running with a load of 225 clients, each hitting the server every 
100 seconds, for 2.5 days. There were no problems after the code was commented 
out. So, looks like there is a problem with the implementation of this method. 
Though I couldn't get you thread dump right now, I remember seeing a lock on 
the ConcurrentHashTable (or some similar class) of EDU.oswego package. That was 
holding the lock and many threads are trying to put in messages into the 
MessageCache, and were not able to get the lock. 

We are using Null Persistence Manager as suggested in one of the wiki pages.
We are not sure why is the Soft Referencing and Hard Referencing happening when 
a Null Persistence is used? All we wanted to extract from JMS is asynchronous 
behaviour, where there is no requirement for "Store and Forward". The server 
can dispatch the result and then forget about it. If a client is alive, it will 
take it, or else the message is gone. There is no problem with this kind of 
approach for our functionality.

The code that's used is as below: (This is stripped down version of our actual 
test code. This is the exact funcationality that we are testing).

EJB & JMS Code:- (Server Side)

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ ServerHome.java @@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.server;

import java.rmi.RemoteException;

import javax.ejb.CreateException;
import javax.ejb.EJBHome;

public interface ServerHome extends EJBHome
{
    public Server create() throws RemoteException, CreateException;
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ Server.java @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.server;

import java.rmi.RemoteException;

import javax.ejb.EJBObject;

public interface Server extends EJBObject
{
    public String getInformation(String correlationId, String queueName) throws 
RemoteException;
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ ServerBean.java @@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.server;

import java.rmi.RemoteException;

import javax.ejb.EJBException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;

public class ServerBean implements SessionBean
{
    private SessionContext myCtx = null;
    private QueueDispatcher myQD = null;
    
    public void ejbCreate()
    {
        System.out.println("Creating ejb.....");
    }
    
    public String getInformation(String correlationId, String queueName)
    {
        myQD = new QueueDispatcher(correlationId,queueName);
        System.out.println("Returning information...");
        return "SUCCESS";
    }
    
    public void ejbActivate() throws EJBException, RemoteException
    {
    }

    public void ejbPassivate() throws EJBException, RemoteException
    {
    }

    public void ejbRemove() throws EJBException, RemoteException
    {
    }

    public void setSessionContext(SessionContext ctx) throws EJBException, 
RemoteException
    {
        System.out.println("Setting session context "+ctx);
        myCtx = ctx;
    }
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ QueueDispatcher.java @@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.server;

import java.util.Hashtable;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class QueueDispatcher
{
    private final String myQueueName ;
    private final String myCorrelationId;
    private Context myInitialContext = null;
    
    private Hashtable myQueueTable = new Hashtable();
    
    public QueueDispatcher(final String correlationId, final String qName)
    {
        myQueueName = qName;
        myCorrelationId = correlationId;
        try
        {
            dispatchResult();
        }
        catch(Exception e)
        {
            System.out.println("Exception while dispatching the result....");
            e.printStackTrace();
        }
    }
    
    private void dispatchResult() throws Exception
    {
        System.out.println("Dispatching the result....");
       
        long size = 65000; // default value        
        Object sizeObj = 
getInitialContext().lookup("java:comp/env/loadtest/replysize");

        if(sizeObj != null)
        {
            size = new Long(sizeObj.toString()).longValue();
        }
        
        char[] emptyString = new char[(int)size]; 
        StringBuffer buffer = new StringBuffer();
        buffer.append(myCorrelationId);
        buffer.append("_GS_");
        buffer.append(emptyString);
        
        writeMessage(myQueueName,buffer.toString());
    }
    
    private Context getInitialContext() throws Exception
    {
        if(myInitialContext == null)
        {
            myInitialContext = new InitialContext();
        }
        return myInitialContext;
    }
    
    private void writeMessage(String queueName, String message) throws Exception
    {
        TextMessage tm = 
getQueueInfo(queueName).getQueueSession().createTextMessage(message);
        getQueueSender(queueName).send(tm);
    }
    
    private QueueInfoHolder getQueueInfo(String queueName)
    {
        return (QueueInfoHolder)myQueueTable.get(queueName);
    }
    
    private QueueSender getQueueSender(String queueName) throws Exception
    {
        QueueInfoHolder holder = getQueueInfo(queueName);
        
        if(holder != null)
        {
            return holder.getQueueSender();
        }
        
        holder = new QueueInfoHolder();
        
        Queue queue = (Queue)getInitialContext().lookup(queueName);
        String connFactoryName = "javax.jms.QueueConnectionFactory"; // conn 
factory name 
        
        QueueConnectionFactory factory = 
(QueueConnectionFactory)getInitialContext().lookup(connFactoryName);
        QueueConnection connection = 
(QueueConnection)factory.createQueueConnection();
        QueueSession session = 
(QueueSession)connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
        QueueSender sender = (QueueSender)session.createSender(queue);
        holder.setQueueSender(sender);
        myQueueTable.put(queueName,holder);
        
        return sender;
    }
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ QueueInfoHolder.java @@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
package test.ejb.server;

import javax.jms.QueueSender;
import javax.jms.QueueSession;

class QueueInfoHolder
{
    private QueueSender mySender;
    private QueueSession mySession;

    QueueInfoHolder()
    {
        mySender = null;
        mySession = null;
    }

    public void setQueueSender(QueueSender sender)
    {
        mySender = sender;
    }
    
    public QueueSender getQueueSender()
    {
        return mySender;
    }
    
    public void setQueueSession(QueueSession session)
    {
        mySession = session;
    }
    
    public QueueSession getQueueSession()
    {
        return mySession;
    }
}


@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ ejb-jar.xml @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

<?xml version="1.0"?>
<!DOCTYPE ejb-jar PUBLIC '-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 
1.1//EN' 'http://java.sun.com/j2ee/dtds/ejb-jar_1_1.dtd'>
<ejb-jar>
    <enterprise-beans>
        <session>
            <ejb-name>server.Server</ejb-name>
            <home>test.jms.server.ServerHome</home>
            <remote>test.jms.server.Server</remote>
            <ejb-class>test.jms.server.ServerBean</ejb-class>
            <session-type>Stateless</session-type>
            <transaction-type>Container</transaction-type>
                        <env-entry>
                                
<env-entry-name>loadtest/replysize</env-entry-name>
                                <env-entry-type>java.lang.Long</env-entry-type>
                                <env-entry-value>100000</env-entry-value>
                        </env-entry>
                        <env-entry>
                                
<env-entry-name>loadtest/jms/connectionfactory</env-entry-name>
                                
<env-entry-type>java.lang.String</env-entry-type>
                                
<env-entry-value>javax.jms.QueueConnectionFactory</env-entry-value>
                        </env-entry>
        </session>
    </enterprise-beans>
</ejb-jar>

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

Client Code: - 

Run this client after setting the system property: run.sleep.time to 10000 (10 
secs) or appropriate
This property is used to hit the server after every 10 seconds.

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ EJBClient.java @@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
package test.ejb.client;

import java.rmi.RMISecurityManager;
import java.util.Properties;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.rmi.PortableRemoteObject;

import test.ejb.server.Server;
import test.ejb.server.ServerHome;

public class EJBClient
{
    private final String SERVER_URL = "jnp://localhost:1099";
    private String myQueueName = null;
    private static int myCounter = 0;
    
    public EJBClient() throws Exception
    {
        int count = 0;

        Thread t = Thread.currentThread();
        while (true)
        {
            run();
            count++;
            try
            {
                long sleepTime = Long.getLong("run.sleep.time", 5000)
                        .longValue();
                Thread.sleep(sleepTime);
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        
    }
    
    private void run() throws Exception
    {
        Properties p = System.getProperties();
        p.put(Context.PROVIDER_URL,SERVER_URL);
        
p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
        Context context = new InitialContext(p);
        
        Object obj = context.lookup("ejbtest.Server");
        
        ServerHome home = 
(ServerHome)PortableRemoteObject.narrow(obj,ServerHome.class);
        Server server = home.create();
        String queueName = 
DefaultJMSQueueDispatcher.getInstance().findOrCreateQueue(SERVER_URL);
        String result = 
server.getInformation(queueName,generateCorrelationId());
    }
    
    public synchronized String generateCorrelationId()
    {
        return (myCounter++) + "_" + myQueueName;
    }
    
    public static void main(String[] args)
    {
        System.setSecurityManager(new RMISecurityManager());
        try
        {
            new EJBClient();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ DefaultJMSQueueDispatcher.java @@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.client;

import java.util.Hashtable;
import java.util.Properties;

import javax.jms.*;
import javax.naming.*;

public class DefaultJMSQueueDispatcher implements MessageListener
{
    private Hashtable myQueueTable;
    private static DefaultJMSQueueDispatcher ourQueueDispatcher = null;
    public static final String FACTORY_URL_PKG 
="org.jboss.naming:org.jnp.interfaces";
    public static final String FACTORY_URL_PROPERTY 
="java.naming.factory.url.pkgs";

    private DefaultJMSQueueDispatcher()
    {
    }

    public static synchronized DefaultJMSQueueDispatcher getInstance()
    {
        if (ourQueueDispatcher == null)
        {
            ourQueueDispatcher = new DefaultJMSQueueDispatcher();
        }
        return ourQueueDispatcher;
    }

    public void onMessage(Message msg)
    {
        try
        {
            String message = ((TextMessage) msg).getText();
            String corrId = message.substring(0, message.indexOf("_GS_"));
            System.out.println("Received Message for Correlation Id: " + 
corrId+ "\n");
        } 
        catch (Exception exp)
        {
            exp.printStackTrace();
        }
    }

    private MyQueueConnectionHolder createQueue(String serverUrl)
            throws NamingException, JMSException
    {
        Context context = getInitialContext(serverUrl);
        QueueConnectionFactory qconFactory = (QueueConnectionFactory) context
                .lookup("javax.jms.QueueConnectionFactory");

        QueueConnection connection = qconFactory.createQueueConnection();
        connection.setExceptionListener(new JMSConnectionExceptionListener(
                serverUrl));
        QueueSession session = connection.createQueueSession(false,
                Session.AUTO_ACKNOWLEDGE);
        Queue queue = (Queue) session.createTemporaryQueue();

        connection.start();
        QueueReceiver receiver = session.createReceiver(queue);
        receiver.setMessageListener(new MyMessageListener());

        try
        {
            context.rebind(queue.getQueueName(), queue);
        } catch (NamingException nmexp)
        {
            nmexp.printStackTrace();
        }

        MyQueueConnectionHolder holder = new MyQueueConnectionHolder(session,
                receiver, queue, connection);
        myQueueTable.put(serverUrl, holder);
        return holder;
    }

    public synchronized String findOrCreateQueue(String serverUrl)
            throws NamingException, JMSException
    {
        Object queue;
        if (myQueueTable == null)
        {
            myQueueTable = new Hashtable();
            queue = createQueue(serverUrl);
        } else
        {
            queue = myQueueTable.get(serverUrl);
            if (queue == null)
            {
                queue = createQueue(serverUrl);
            }
        }
        return ((MyQueueConnectionHolder) queue).myQueue.getQueueName();
    }

    public void close(String serverUrl) throws JMSException
    {
        System.out.println("JMSDispatcher::close Closing Queue " + serverUrl);
        Object queue = myQueueTable.get(serverUrl);
        myQueueTable.remove(serverUrl);
        if (queue != null)
        {
            ((MyQueueConnectionHolder) queue).myReciever.close();
            ((MyQueueConnectionHolder) queue).mySession.close();
            ((MyQueueConnectionHolder) queue).myConnection.close();
        }
    }

    private Context getInitialContext(String url) throws NamingException
    {
        Properties p = System.getProperties();
        p.put(Context.PROVIDER_URL,"jnp://localhost:1099");
        
p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
        p.put(FACTORY_URL_PROPERTY, FACTORY_URL_PKG);
        return new InitialContext(p);
    }

    private class MyQueueConnectionHolder
    {
        public Queue myQueue;
        public Connection myConnection;
        public Session mySession;
        public QueueReceiver myReciever;

        public MyQueueConnectionHolder(Session session, QueueReceiver reciever,
                Queue queue, Connection connection)
        {
            mySession = session;
            myReciever = reciever;
            myQueue = queue;
            myConnection = connection;
        }
    }

    private class JMSConnectionExceptionListener implements ExceptionListener
    {
        private String myUrl;

        public JMSConnectionExceptionListener(String serverUrl)
        {
            myUrl = serverUrl;
        }

        public void onException(JMSException exp)
        {
            try
            {
                close(myUrl);
            } catch (JMSException jmsexp)
            {
                exp.printStackTrace();
            }
        }
    }
}

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@ MyMessageListener.java @@@@@@@@@@@@@@@@@@@@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

package test.ejb.client;

import javax.jms.*;

public class MyMessageListener implements MessageListener
{
    public void onMessage(Message msg)
    {
        try
        {
            String message = ((TextMessage)msg).getText();
            long endTime = System.currentTimeMillis();
            String corrId = message.substring(0,message.indexOf("_GS_"));
            System.out.println("Received Message for Correlation Id: " + 
corrId);
        } 
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }
}


-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://jira.jboss.com/jira/secure/Administrators.jspa
-
If you want more information on JIRA, or have a bug to report see:
   http://www.atlassian.com/software/jira



-------------------------------------------------------
SF email is sponsored by - The IT Product Guide
Read honest & candid reviews on hundreds of IT Products from real users.
Discover which products truly live up to the hype. Start reading now. 
http://productguide.itmanagersjournal.com/
_______________________________________________
JBoss-Development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to