[Java Client] Use of thread-unsafe HashMap for destination->consumer causes
timeouts
------------------------------------------------------------------------------------
Key: QPID-1124
URL: https://issues.apache.org/jira/browse/QPID-1124
Project: Qpid
Issue Type: Bug
Components: Java Client
Affects Versions: M3
Reporter: Rob Godfrey
Assignee: Rob Godfrey
Fix For: M3
When testing the trunk client against the C++ broker, I found I occaisionally
got
java.lang.RuntimeException: timed out waiting for sync
at org.apache.qpidity.transport.Session.sync(Session.java:334)
at org.apache.qpidity.transport.Session.sync(Session.java:293)
at
org.apache.qpidity.nclient.impl.ClientSession.sync(ClientSession.java:107)
at
org.apache.qpid.client.AMQSession_0_10.sendConsume(AMQSession_0_10.java:407)
at
org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2078)
at
org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:2381)
at org.apache.qpid.client.AMQSession.access$500(AMQSession.java:110)
at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1648)
at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1620)
at
org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:119)
at
org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:1618)
>From the following test:
AMQConnection con = new AMQConnection("amqp://username:[EMAIL
PROTECTED]/test?brokerlist='tcp://127.0.0.1:5672'");
TopicSession session1 = con.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),
"death");
TopicPublisher publisher = session1.createPublisher(topic);
MessageListener messageListener = new MessageListener()
{
public void onMessage(Message message)
{
}
};
List<TopicSubscriber> listenerList = new ArrayList<TopicSubscriber>();
TopicSubscriber listener = session1.createSubscriber(topic);
listener.setMessageListener(messageListener);
listenerList.add(listener);
con.start();
for(int i=0; i < 10000 ; i++)
{
BytesMessage outmsg = session1.createBytesMessage();
final byte[] bytes = new byte[1024];
outmsg.writeBytes(bytes);
publisher.send(outmsg);
System.out.println("Sent " + i + "messages") ;
listener = session1.createSubscriber(topic);
listener.setMessageListener(messageListener);
listenerList.add(listener);
}
The underlying cause is an exception in the MINAHandler (which MINA swallows):
java.lang.NullPointerException
at
org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:63)
at
org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:20)
at
org.apache.qpidity.transport.MessageTransfer.dispatch(MessageTransfer.java:59)
at
org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:44)
at
org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:32)
at org.apache.qpidity.transport.Method.delegate(Method.java:75)
at org.apache.qpidity.transport.Channel.command(Channel.java:104)
at org.apache.qpidity.transport.Channel.command(Channel.java:43)
at org.apache.qpidity.transport.Method.delegate(Method.java:75)
at org.apache.qpidity.transport.Channel.received(Channel.java:75)
at org.apache.qpidity.transport.Connection.received(Connection.java:82)
at org.apache.qpidity.transport.Connection.received(Connection.java:43)
at
org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:92)
at
org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:97)
at
org.apache.qpidity.transport.network.Assembler.assemble(Assembler.java:160)
at
org.apache.qpidity.transport.network.Assembler.frame(Assembler.java:129)
at org.apache.qpidity.transport.network.Frame.delegate(Frame.java:145)
at
org.apache.qpidity.transport.network.Assembler.received(Assembler.java:102)
at
org.apache.qpidity.transport.network.Assembler.received(Assembler.java:51)
at
org.apache.qpidity.transport.network.InputHandler.frame(InputHandler.java:103)
at
org.apache.qpidity.transport.network.InputHandler.next(InputHandler.java:204)
at
org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:116)
at
org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:41)
at
org.apache.qpidity.transport.network.mina.MinaHandler.messageReceived(MinaHandler.java:84)
And this is caused by the fact that the _messageListeners Map on ClientSession
is not thread safe; but is being access simultaneously by more than one thread
Fix is to use a ConcurrentHashMap
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.