[Java Client] Use of thread-unsafe HashMap for destination->consumer causes 
timeouts
------------------------------------------------------------------------------------

                 Key: QPID-1124
                 URL: https://issues.apache.org/jira/browse/QPID-1124
             Project: Qpid
          Issue Type: Bug
          Components: Java Client
    Affects Versions: M3
            Reporter: Rob Godfrey
            Assignee: Rob Godfrey
             Fix For: M3


When testing the trunk client against the C++ broker, I found I occaisionally 
got 

 java.lang.RuntimeException: timed out waiting for sync
        at org.apache.qpidity.transport.Session.sync(Session.java:334)
        at org.apache.qpidity.transport.Session.sync(Session.java:293)
        at 
org.apache.qpidity.nclient.impl.ClientSession.sync(ClientSession.java:107)
        at 
org.apache.qpid.client.AMQSession_0_10.sendConsume(AMQSession_0_10.java:407)
        at 
org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2078)
        at 
org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:2381)
        at org.apache.qpid.client.AMQSession.access$500(AMQSession.java:110)
        at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1648)
        at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1620)
        at 
org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:119)
        at 
org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:1618)

>From the following test:

        AMQConnection con = new AMQConnection("amqp://username:[EMAIL 
PROTECTED]/test?brokerlist='tcp://127.0.0.1:5672'");
        TopicSession session1 = con.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
        AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), 
"death");
        TopicPublisher publisher = session1.createPublisher(topic);


        MessageListener messageListener = new MessageListener()
        {
            public void onMessage(Message message)
            {
            }
        };

        List<TopicSubscriber> listenerList = new ArrayList<TopicSubscriber>();

        TopicSubscriber listener = session1.createSubscriber(topic);
        listener.setMessageListener(messageListener);
        listenerList.add(listener);


        con.start();

        for(int i=0; i < 10000 ; i++)
        {
            BytesMessage outmsg = session1.createBytesMessage();
            final byte[] bytes = new byte[1024];
            outmsg.writeBytes(bytes);
            publisher.send(outmsg);

            System.out.println("Sent " + i + "messages") ;

            listener = session1.createSubscriber(topic);
            listener.setMessageListener(messageListener);
            listenerList.add(listener);

        }

The underlying cause is an exception in the MINAHandler (which MINA swallows):

java.lang.NullPointerException
        at 
org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:63)
        at 
org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:20)
        at 
org.apache.qpidity.transport.MessageTransfer.dispatch(MessageTransfer.java:59)
        at 
org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:44)
        at 
org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:32)
        at org.apache.qpidity.transport.Method.delegate(Method.java:75)
        at org.apache.qpidity.transport.Channel.command(Channel.java:104)
        at org.apache.qpidity.transport.Channel.command(Channel.java:43)
        at org.apache.qpidity.transport.Method.delegate(Method.java:75)
        at org.apache.qpidity.transport.Channel.received(Channel.java:75)
        at org.apache.qpidity.transport.Connection.received(Connection.java:82)
        at org.apache.qpidity.transport.Connection.received(Connection.java:43)
        at 
org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:92)
        at 
org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:97)
        at 
org.apache.qpidity.transport.network.Assembler.assemble(Assembler.java:160)
        at 
org.apache.qpidity.transport.network.Assembler.frame(Assembler.java:129)
        at org.apache.qpidity.transport.network.Frame.delegate(Frame.java:145)
        at 
org.apache.qpidity.transport.network.Assembler.received(Assembler.java:102)
        at 
org.apache.qpidity.transport.network.Assembler.received(Assembler.java:51)
        at 
org.apache.qpidity.transport.network.InputHandler.frame(InputHandler.java:103)
        at 
org.apache.qpidity.transport.network.InputHandler.next(InputHandler.java:204)
        at 
org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:116)
        at 
org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:41)
        at 
org.apache.qpidity.transport.network.mina.MinaHandler.messageReceived(MinaHandler.java:84)

And this is caused by the fact that the _messageListeners Map on ClientSession 
is not thread safe; but is being access simultaneously by more than one thread

Fix is to use a ConcurrentHashMap

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to