Hi Eric

I think your understanding of JMS is correct.  Message listeners on
independent sessions should provide for concurrent message delivery [1].
Application code blocking one onMessage should not affect delivery to
another on a separate session.
As you have observed, this is not the case with the current AMQP 1.0 client
(qpid-amqp-1-0-client-jms-x.xx). An internal connection scoped lock is
shared  by the message delivery paths.   To workaround this issue, you
could use independent connections.  If you wish, raise a Jira and submit a
patch.   To the best of my knowledge, most attention in the AMQP1.0 JMS
client area is being directed towards the new JMS 2.0 client.

Hope this helps, Keith.

[1] JMS 1.1 Spec, 4.17 Concurrent Message Delivery

On 28 January 2015 at 10:57, Erik Aschenbrenner <[email protected]> wrote:

>
> Ok, here are the missing details:
>
> I'm using the Java Qpid JMS client for AMQP 1.0 (version 0.30).
>
> The broker I'm connecting to also uses AMQP 1.0 and is from Red Hat (as far
> as I know). Actually I don't have other informations about the broker than
> the AMQP version.
>
> To make the problem clearer here is a more simpler setup of the problem:
>
> I have 2 topic message listeners listen to different topics. If listener 1
> receives a message it calls Thread.sleep(Long.MAX_VALUE) in it's
> onMessage()
> method in my test case.
> From now on listener 2 never gets a message from the AMQP broker again.
>
> I attached the stack trace at the end.
>
> Thanks in advance,
> Erik
>
> PS: I changed my code now so that listener 1 is not not blocking anymore
> but
> nevertheless I would like to know what is the problem here because I
> thought
> that the
> message listeners should be independent.
>
>
>
> 2015-01-28 11:40:12
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode):
>
> "RMI TCP Connection(2)-192.168.3.33" daemon prio=6 tid=0x0000000011f6e000
> nid=0x21bc runnable [0x00000000148af000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketInputStream.socketRead0(Native Method)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.io.BufferedInputStream.fill(Unknown Source)
>         at java.io.BufferedInputStream.read(Unknown Source)
>         - locked <0x00000007b03e6e68> (a java.io.BufferedInputStream)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - <0x00000007afe904a0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
> "JMX server connection timeout 18" daemon prio=6 tid=0x0000000011f73000
> nid=0x2128 in Object.wait() [0x00000000146ef000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00000007b01435a8> (a [I)
>         at
> com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Unknown
> Source)
>         - locked <0x00000007b01435a8> (a [I)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "RMI Scheduler(0)" daemon prio=6 tid=0x0000000012308000 nid=0x2290 waiting
> on condition [0x00000000144ff000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007afe49618> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
> Source)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
> Source)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "RMI TCP Connection(1)-192.168.3.33" daemon prio=6 tid=0x000000001217d800
> nid=0x2138 runnable [0x00000000143be000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketInputStream.socketRead0(Native Method)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.io.BufferedInputStream.fill(Unknown Source)
>         at java.io.BufferedInputStream.read(Unknown Source)
>         - locked <0x00000007b00ef710> (a java.io.BufferedInputStream)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - <0x00000007afe8e0d0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
> "RMI TCP Accept-0" daemon prio=6 tid=0x0000000013158800 nid=0x9c4 runnable
> [0x000000001421e000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.DualStackPlainSocketImpl.accept0(Native Method)
>         at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
>         at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
>         at java.net.PlainSocketImpl.accept(Unknown Source)
>         - locked <0x00000007afe4ee58> (a java.net.SocksSocketImpl)
>         at java.net.ServerSocket.implAccept(Unknown Source)
>         at java.net.ServerSocket.accept(Unknown Source)
>         at
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(Unknown
> Source)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-3" prio=6 tid=0x0000000011f5b000 nid=0x21b8 waiting on condition
> [0x0000000012fbe000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest$1.onMessage(MasterDataRetrievalTest.java:306)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-2" prio=6 tid=0x0000000011f7d000 nid=0x1554 in Object.wait()
> [0x0000000012d9f000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Object.wait(Object.java:503)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-1" prio=6 tid=0x0000000011e9b000 nid=0x1804 in Object.wait()
> [0x0000000012baf000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Object.wait(Object.java:503)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "QpidConnectionInputThread-0" daemon prio=6 tid=0x000000001190f000
> nid=0x23d4 waiting for monitor entry [0x0000000011cef000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
>
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.closedForInput(ConnectionEndpoint.java:730)
>         - waiting to lock <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at
> org.apache.qpid.amqp_1_0.framing.FrameHandler.isDone(FrameHandler.java:328)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler.isDone(ConnectionHandler.java:83)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier.doRead(TCPTransportProvier.java:215)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier.access$000(TCPTransportProvier.java:43)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier$1.run(TCPTransportProvier.java:158)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "QpidConnectionOutputThread-0" daemon prio=6 tid=0x00000000116b2000
> nid=0x2364 waiting for monitor entry [0x00000000128cf000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput.getNextFrame(ConnectionHandler.java:207)
>         - waiting to lock <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput$2.getNextFrame(ConnectionHandler.java:125)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameToBytesSourceAdapter.getBytes(ConnectionHandler.java:317)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource.getBytes(ConnectionHandler.java:406)
>         - locked <0x0000000700a1a570> (a
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler.run(ConnectionHandler.java:508)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Service Thread" daemon prio=6 tid=0x000000000f94e800 nid=0x1c8c runnable
> [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "C2 CompilerThread1" daemon prio=10 tid=0x000000000f94e000 nid=0x4dc
> waiting
> on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "C2 CompilerThread0" daemon prio=10 tid=0x000000000f949000 nid=0x1080
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Attach Listener" daemon prio=10 tid=0x000000000f947000 nid=0x1e00 waiting
> on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Signal Dispatcher" daemon prio=10 tid=0x000000000f940000 nid=0xff4
> runnable
> [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Finalizer" daemon prio=8 tid=0x000000000f8e7000 nid=0x1a88 in
> Object.wait()
> [0x0000000010ddf000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a11190> (a
> java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(Unknown Source)
>         - locked <0x0000000700a11190> (a java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(Unknown Source)
>         at java.lang.ref.Finalizer$FinalizerThread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Reference Handler" daemon prio=10 tid=0x000000000f8e0000 nid=0x170 in
> Object.wait() [0x0000000010b1f000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)
>         at java.lang.Object.wait(Object.java:503)
>         at java.lang.ref.Reference$ReferenceHandler.run(Unknown Source)
>         - locked <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)
>
>    Locked ownable synchronizers:
>         - None
>
> "main" prio=6 tid=0x00000000024be800 nid=0x1354 waiting on condition
> [0x00000000027af000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.doWait(MasterDataRetrievalTest.java:179)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.run(MasterDataRetrievalTest.java:141)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.main(MasterDataRetrievalTest.java:85)
>
>    Locked ownable synchronizers:
>         - None
>
> "VM Thread" prio=10 tid=0x000000000f8d8800 nid=0xd70 runnable
>
> "GC task thread#0 (ParallelGC)" prio=6 tid=0x000000000250d000 nid=0x10f4
> runnable
>
> "GC task thread#1 (ParallelGC)" prio=6 tid=0x000000000250f000 nid=0x2310
> runnable
>
> "GC task thread#2 (ParallelGC)" prio=6 tid=0x0000000002510800 nid=0x1ba4
> runnable
>
> "GC task thread#3 (ParallelGC)" prio=6 tid=0x0000000002512000 nid=0x19a4
> runnable
>
> "VM Periodic Task Thread" prio=10 tid=0x000000000f959000 nid=0x114c waiting
> on condition
>
> JNI global references: 169
>
>
>
>
>
>
>
> --
> View this message in context:
> http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7618985.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to