Hi,
I am using the 0.16 codeline and am working with the Java Broker and JMS
client.
I the following scenario,
1) Create transacted session
2) create Queue A.
3) Create a consumer and register a callback handler to the queue A using
the transacted session.
2) Enqueue a message to the queue A.
3) Close the consumer in the inititalization thread.
4) On the onMessage() callback of the handler try to rollback the session.
The consumer.close() call from the initialization thread and the rollback()
call from the handler thread arrive at a deadlock.
I've attached a test, which simulates the scenario explained above.
I tried to investigate this further a little bit, and found the actual 2
threads that get into the deadlock.
The deadlock is on the failovermutex() lock,
The consumer thread grabs the failovermutex() lock first and waits on the
session dispatcher lock,
whereas the handler thread, holds the dispatcher lock, and then tries to
grab the failovermutex() lock.
I've attached the actual stack traces of the deadlock, along with locks
grabbed list.
I'm curious is this is a issue with using the java broker only, as I see
that,
the sync() on the rollback call actually tries to set an exception(Please
see stack trace file attached).
It will be great if someone could let me know what is going on, and help
fix this.
I've logged a JIRA for this here
https://issues.apache.org/jira/browse/QPID-3911
Thanks a lot,
--
-Praveen
Found one Java-level deadlock:
=============================
"Dispatcher-Channel-0":
waiting to lock monitor 0x0000000001e65ec8 (object
0x00000007c180bd58, a java.lang.Object),
which is held by "main"
"main":
waiting to lock monitor 0x0000000001cffbc8 (object
0x00000007c2e10c08, a java.lang.Object),
which is held by "Dispatcher-Channel-0"
Java stack information for the threads listed above:
===================================================
"Dispatcher-Channel-0":
at
org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1255)
- waiting to lock <0x00000007c180bd58> (a java.lang.Object)
at
org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1057)
at
org.apache.qpid.client.AMQSession_0_10.sync(AMQSession_0_10.java:1034)
at
org.apache.qpid.client.AMQSession_0_10.sendSuspendChannel(AMQSession_0_10.java:851)
at
org.apache.qpid.client.AMQSession.suspendChannel(AMQSession.java:3075)
- locked <0x00000007c2c3d330> (a java.lang.Object)
at org.apache.qpid.client.AMQSession.rollback(AMQSession.java:1854)
- locked <0x00000007c2c3d330> (a java.lang.Object)
at
QpidConsumerCloseRollbackDeadlock$QpidMqHandler.onMessage(QpidConsumerCloseRollbackDeadlock.java:208)
at
org.apache.qpid.client.BasicMessageConsumer.notifyMessage(BasicMessageConsumer.java:745)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:141)
at
org.apache.qpid.client.BasicMessageConsumer.notifyMessage(BasicMessageConsumer.java:719)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:186)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:54)
at
org.apache.qpid.client.AMQSession$Dispatcher.notifyConsumer(AMQSession.java:3467)
at
org.apache.qpid.client.AMQSession$Dispatcher.dispatchMessage(AMQSession.java:3406)
- locked <0x00000007c2c3d350> (a java.lang.Object)
- locked <0x00000007c2e10c08> (a java.lang.Object)
at
org.apache.qpid.client.AMQSession$Dispatcher.access$1000(AMQSession.java:3180)
at org.apache.qpid.client.AMQSession.dispatch(AMQSession.java:3173)
at
org.apache.qpid.client.message.UnprocessedMessage.dispatch(UnprocessedMessage.java:54)
at
org.apache.qpid.client.AMQSession$Dispatcher.run(AMQSession.java:3329)
at java.lang.Thread.run(Thread.java:636)
"main":
at
org.apache.qpid.client.AMQSession$Dispatcher.rejectPending(AMQSession.java:3211)
- waiting to lock <0x00000007c2e10c08> (a java.lang.Object)
at
org.apache.qpid.client.AMQSession.confirmConsumerCancelled(AMQSession.java:903)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.sendCancel(BasicMessageConsumer_0_10.java:170)
at
org.apache.qpid.client.BasicMessageConsumer.close(BasicMessageConsumer.java:593)
- locked <0x00000007c180bd58> (a java.lang.Object)
at
org.apache.qpid.client.BasicMessageConsumer.close(BasicMessageConsumer.java:555)
at
QpidConsumerCloseRollbackDeadlock.main(QpidConsumerCloseRollbackDeadlock.java:77)
Found 1 deadlock.
import java.util.*;
import java.util.concurrent.*;
import javax.jms.*;
import javax.naming.*;
class QpidConsumerCloseRollbackDeadlock {
private transient Connection connection;
transient Session session;
private transient MessageProducer emptyProducer;
// The URL Used to connect tot he broker.
private static String connUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:5672?tcp_nodelay='true''&max_prefetch='1'";
final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
final String CONNECTION_JNDI_NAME = "local";
private InitialContext _ctx;
Map<String, Destination> queueNameToDestination = new HashMap<String, Destination>();
// the options used when creating a new queue
private static String options = ";{create: always , node : {type : queue, durable : true}}";
public static void main(String[] args) {
String queueName = "TestingQ";
QpidConsumerCloseRollbackDeadlock test = null;
MessageConsumer consumer = null;
QpidConsumerCloseRollbackDeadlock.QpidMqHandler handler = null;
try {
test = new QpidConsumerCloseRollbackDeadlock();
// open connection, transacted session to the broker
test.open();
CountDownLatch latch = new CountDownLatch(1);
// create The queues in the broker
test.createQueue(queueName);
// register the handlers for the queue.
handler = new QpidMqHandler(test.session, latch);
consumer = test.listen(queueName, handler);
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException(e);
}
try {
// enqueue 1 message and commit that message
test.enqueue(queueName, "ABCDEF");
test.commitSession();
} catch (Throwable e) {
e.printStackTrace();
throw new RuntimeException(e);
}
try {
test.waitOnHandler(handler);
consumer.close();
test.close();
} catch (Exception e) {
}
}
/**
* Open a new connection to the broker and start it.
*/
public void open() throws Exception {
// Set the properties ...
Properties properties = new Properties();
properties
.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
properties.put("connectionfactory." + CONNECTION_JNDI_NAME, connUrl);
try {
_ctx = new InitialContext(properties);
} catch (NamingException e) {
System.err.println("Error Setting up JNDI Context:" + e);
}
connection = ((ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME))
.createConnection();
// create transacted session
session = connection.createSession(true, Session.SESSION_TRANSACTED);
emptyProducer = session.createProducer(null);
emptyProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
/**
* Close the connection to the broker
*/
public void close() throws Exception {
try {
if (session != null) {
session.close();
}
} finally {
try {
if (connection != null) {
connection.close();
}
} finally {
if (_ctx != null) {
_ctx.close();
}
}
}
}
/**
* Creates a new queue and adds it to the destination to queue map.
*
*/
public String createQueue(String queueName) throws Exception {
Destination destination = session.createQueue(queueName + options);
if (destination != null) {
queueNameToDestination.put(queueName, destination);
return queueName;
} else {
System.out.println("Queue Created Null");
return null;
}
}
/**
* Create a listener for the queue and returns the message consumer
*
* @return
*/
public MessageConsumer listen(String p2pConsumer, QpidMqHandler handler)
throws Exception {
Destination destination = queueNameToDestination.get(p2pConsumer);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(handler);
return consumer;
}
/**
* Wait for consumers to complete.
*/
private void waitOnHandler(QpidMqHandler handler) throws Exception {
boolean handlerCompleted = handler.latch.await(60, TimeUnit.SECONDS);
if (!handlerCompleted) {
System.out.println("The test failed to complete");
}
}
/**
* Enqueue Messages
*/
public String enqueue(String p2pConsumer, String payload) throws Exception {
MapMessage message = session.createMapMessage();
message.setString("Body", payload);
Destination destination = queueNameToDestination.get(p2pConsumer);
emptyProducer.send(destination, message);
return message.getJMSMessageID();
}
/**
* Commit the session
*/
public void commitSession() throws Exception {
session.commit();
}
/**
* The callback handler
*/
static public class QpidMqHandler implements MessageListener {
final CountDownLatch latch;
final Session session;
public QpidMqHandler(Session session, CountDownLatch suppliedLatch) {
this.session = session;
this.latch = suppliedLatch;
}
@Override
final public void onMessage(Message arg0) {
try {
// countdown the latch and rollback to simulate a failure.
latch.countDown();
session.rollback();
} catch (Exception x) {
throw new RuntimeException(x);
}
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]