Hi Eric I think your understanding of JMS is correct. Message listeners on independent sessions should provide for concurrent message delivery [1]. Application code blocking one onMessage should not affect delivery to another on a separate session. As you have observed, this is not the case with the current AMQP 1.0 client (qpid-amqp-1-0-client-jms-x.xx). An internal connection scoped lock is shared by the message delivery paths. To workaround this issue, you could use independent connections. If you wish, raise a Jira and submit a patch. To the best of my knowledge, most attention in the AMQP1.0 JMS client area is being directed towards the new JMS 2.0 client.
Hope this helps, Keith. [1] JMS 1.1 Spec, 4.17 Concurrent Message Delivery On 28 January 2015 at 10:57, Erik Aschenbrenner <[email protected]> wrote: > > Ok, here are the missing details: > > I'm using the Java Qpid JMS client for AMQP 1.0 (version 0.30). > > The broker I'm connecting to also uses AMQP 1.0 and is from Red Hat (as far > as I know). Actually I don't have other informations about the broker than > the AMQP version. > > To make the problem clearer here is a more simpler setup of the problem: > > I have 2 topic message listeners listen to different topics. If listener 1 > receives a message it calls Thread.sleep(Long.MAX_VALUE) in it's > onMessage() > method in my test case. > From now on listener 2 never gets a message from the AMQP broker again. > > I attached the stack trace at the end. > > Thanks in advance, > Erik > > PS: I changed my code now so that listener 1 is not not blocking anymore > but > nevertheless I would like to know what is the problem here because I > thought > that the > message listeners should be independent. > > > > 2015-01-28 11:40:12 > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode): > > "RMI TCP Connection(2)-192.168.3.33" daemon prio=6 tid=0x0000000011f6e000 > nid=0x21bc runnable [0x00000000148af000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.read(Unknown Source) > at java.net.SocketInputStream.read(Unknown Source) > at java.io.BufferedInputStream.fill(Unknown Source) > at java.io.BufferedInputStream.read(Unknown Source) > - locked <0x00000007b03e6e68> (a java.io.BufferedInputStream) > at java.io.FilterInputStream.read(Unknown Source) > at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown > Source) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown > Source) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - <0x00000007afe904a0> (a > java.util.concurrent.ThreadPoolExecutor$Worker) > > "JMX server connection timeout 18" daemon prio=6 tid=0x0000000011f73000 > nid=0x2128 in Object.wait() [0x00000000146ef000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x00000007b01435a8> (a [I) > at > com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Unknown > Source) > - locked <0x00000007b01435a8> (a [I) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "RMI Scheduler(0)" daemon prio=6 tid=0x0000000012308000 nid=0x2290 waiting > on condition [0x00000000144ff000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007afe49618> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source) > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown > Source) > at > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "RMI TCP Connection(1)-192.168.3.33" daemon prio=6 tid=0x000000001217d800 > nid=0x2138 runnable [0x00000000143be000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.read(Unknown Source) > at java.net.SocketInputStream.read(Unknown Source) > at java.io.BufferedInputStream.fill(Unknown Source) > at java.io.BufferedInputStream.read(Unknown Source) > - locked <0x00000007b00ef710> (a java.io.BufferedInputStream) > at java.io.FilterInputStream.read(Unknown Source) > at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown > Source) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown > Source) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source) > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - <0x00000007afe8e0d0> (a > java.util.concurrent.ThreadPoolExecutor$Worker) > > "RMI TCP Accept-0" daemon prio=6 tid=0x0000000013158800 nid=0x9c4 runnable > [0x000000001421e000] > java.lang.Thread.State: RUNNABLE > at java.net.DualStackPlainSocketImpl.accept0(Native Method) > at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) > at java.net.AbstractPlainSocketImpl.accept(Unknown Source) > at java.net.PlainSocketImpl.accept(Unknown Source) > - locked <0x00000007afe4ee58> (a java.net.SocksSocketImpl) > at java.net.ServerSocket.implAccept(Unknown Source) > at java.net.ServerSocket.accept(Unknown Source) > at > sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(Unknown > Source) > at > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(Unknown > Source) > at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(Unknown > Source) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "Thread-3" prio=6 tid=0x0000000011f5b000 nid=0x21b8 waiting on condition > [0x0000000012fbe000] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > > de.amqptest.connection.MasterDataRetrievalTest$1.onMessage(MasterDataRetrievalTest.java:306) > at > > org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942) > - locked <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "Thread-2" prio=6 tid=0x0000000011f7d000 nid=0x1554 in Object.wait() > [0x0000000012d9f000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at java.lang.Object.wait(Object.java:503) > at > > org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901) > - locked <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "Thread-1" prio=6 tid=0x0000000011e9b000 nid=0x1804 in Object.wait() > [0x0000000012baf000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at java.lang.Object.wait(Object.java:503) > at > > org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901) > - locked <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "QpidConnectionInputThread-0" daemon prio=6 tid=0x000000001190f000 > nid=0x23d4 waiting for monitor entry [0x0000000011cef000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.closedForInput(ConnectionEndpoint.java:730) > - waiting to lock <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at > org.apache.qpid.amqp_1_0.framing.FrameHandler.isDone(FrameHandler.java:328) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler.isDone(ConnectionHandler.java:83) > at > > org.apache.qpid.amqp_1_0.client.TCPTransportProvier.doRead(TCPTransportProvier.java:215) > at > > org.apache.qpid.amqp_1_0.client.TCPTransportProvier.access$000(TCPTransportProvier.java:43) > at > > org.apache.qpid.amqp_1_0.client.TCPTransportProvier$1.run(TCPTransportProvier.java:158) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "QpidConnectionOutputThread-0" daemon prio=6 tid=0x00000000116b2000 > nid=0x2364 waiting for monitor entry [0x00000000128cf000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput.getNextFrame(ConnectionHandler.java:207) > - waiting to lock <0x0000000700a0e518> (a > org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput$2.getNextFrame(ConnectionHandler.java:125) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameToBytesSourceAdapter.getBytes(ConnectionHandler.java:317) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource.getBytes(ConnectionHandler.java:406) > - locked <0x0000000700a1a570> (a > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource) > at > > org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler.run(ConnectionHandler.java:508) > at java.lang.Thread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "Service Thread" daemon prio=6 tid=0x000000000f94e800 nid=0x1c8c runnable > [0x0000000000000000] > java.lang.Thread.State: RUNNABLE > > Locked ownable synchronizers: > - None > > "C2 CompilerThread1" daemon prio=10 tid=0x000000000f94e000 nid=0x4dc > waiting > on condition [0x0000000000000000] > java.lang.Thread.State: RUNNABLE > > Locked ownable synchronizers: > - None > > "C2 CompilerThread0" daemon prio=10 tid=0x000000000f949000 nid=0x1080 > waiting on condition [0x0000000000000000] > java.lang.Thread.State: RUNNABLE > > Locked ownable synchronizers: > - None > > "Attach Listener" daemon prio=10 tid=0x000000000f947000 nid=0x1e00 waiting > on condition [0x0000000000000000] > java.lang.Thread.State: RUNNABLE > > Locked ownable synchronizers: > - None > > "Signal Dispatcher" daemon prio=10 tid=0x000000000f940000 nid=0xff4 > runnable > [0x0000000000000000] > java.lang.Thread.State: RUNNABLE > > Locked ownable synchronizers: > - None > > "Finalizer" daemon prio=8 tid=0x000000000f8e7000 nid=0x1a88 in > Object.wait() > [0x0000000010ddf000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0000000700a11190> (a > java.lang.ref.ReferenceQueue$Lock) > at java.lang.ref.ReferenceQueue.remove(Unknown Source) > - locked <0x0000000700a11190> (a java.lang.ref.ReferenceQueue$Lock) > at java.lang.ref.ReferenceQueue.remove(Unknown Source) > at java.lang.ref.Finalizer$FinalizerThread.run(Unknown Source) > > Locked ownable synchronizers: > - None > > "Reference Handler" daemon prio=10 tid=0x000000000f8e0000 nid=0x170 in > Object.wait() [0x0000000010b1f000] > java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock) > at java.lang.Object.wait(Object.java:503) > at java.lang.ref.Reference$ReferenceHandler.run(Unknown Source) > - locked <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock) > > Locked ownable synchronizers: > - None > > "main" prio=6 tid=0x00000000024be800 nid=0x1354 waiting on condition > [0x00000000027af000] > java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > > de.amqptest.connection.MasterDataRetrievalTest.doWait(MasterDataRetrievalTest.java:179) > at > > de.amqptest.connection.MasterDataRetrievalTest.run(MasterDataRetrievalTest.java:141) > at > > de.amqptest.connection.MasterDataRetrievalTest.main(MasterDataRetrievalTest.java:85) > > Locked ownable synchronizers: > - None > > "VM Thread" prio=10 tid=0x000000000f8d8800 nid=0xd70 runnable > > "GC task thread#0 (ParallelGC)" prio=6 tid=0x000000000250d000 nid=0x10f4 > runnable > > "GC task thread#1 (ParallelGC)" prio=6 tid=0x000000000250f000 nid=0x2310 > runnable > > "GC task thread#2 (ParallelGC)" prio=6 tid=0x0000000002510800 nid=0x1ba4 > runnable > > "GC task thread#3 (ParallelGC)" prio=6 tid=0x0000000002512000 nid=0x19a4 > runnable > > "VM Periodic Task Thread" prio=10 tid=0x000000000f959000 nid=0x114c waiting > on condition > > JNI global references: 169 > > > > > > > > -- > View this message in context: > http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7618985.html > Sent from the Apache Qpid users mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
