Hi,

Is there any limit to the number of Topics that can be created on a single
broker? As per my understanding, there is no such limit and it depends on
the system in which the broker is running.

Problem observed which raised the above question :

I tried to simulate an issue which occurs in our production environment when
the no of topics created are very high (25000+). The subscriber test client
does the following :
a) Creates 5 threads each of which will establish a connection with the AMQ
broker using Eclipse Paho's MqttClient V3 lib (so 5 connections are
established)
b) Each thread further creates 5 more threads and these 5 threads use the
same connection to subscribe to different topics in a loop. (so overall
there are 25 threads using 5 connections to subscribe to different topics).

On another machine, I ran another publisher test client which does similar
to subscriber test client, except that it publishes messages in a loop
instead of subscribing to the topics.

When I run these test clients, it runs fine for a few minutes (15-30 mins)
and then starts failing with "Subscribe operation timed out exception" from
MqttClient. I have seen MqttClient code and observed that this exception is
thrown if the client does not receive ACK for subscribe operation within 30
secs. So why is AMQ not able to send ACK within 30 secs? There are no
exceptions in AMQ logs when this happens. After the subscriber test client
failed, publisher test client also fails with "Publish operation timed out".

When the exceptions started, I killed the test clients abruptly and after
sometime saw a bunch of following exceptions in AMQ logs :
2016-03-28 10:36:48,612 | WARN  | Failed to send frame MQTTFrame { type:
PUBCOMP, qos: AT_MOST_ONCE, dup:false } |
org.apache.activemq.transport.mqtt.MQTTProtocolConverter | ActiveMQ NIO
Worker 1255
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)[:1.7.0_95]
        at 
sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)[:1.7.0_95]
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)[:1.7.0_95]
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)[:1.7.0_95]
        at
sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:492)[:1.7.0_95]
        at
org.apache.activemq.transport.nio.NIOOutputStream.write(NIOOutputStream.java:206)[activemq-client-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.nio.NIOOutputStream.flush(NIOOutputStream.java:132)[activemq-client-5.13.1.jar:5.13.1]
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)[:1.7.0_95]
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:194)[activemq-client-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToMQTT(MQTTTransportFilter.java:120)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToMQTT(MQTTProtocolConverter.java:183)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTPubRel(MQTTProtocolConverter.java:489)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:224)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$1.onFrame(MQTTCodec.java:65)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:90)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec.access$400(MQTTCodec.java:26)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$4.parse(MQTTCodec.java:213)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$3.parse(MQTTCodec.java:179)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$2.parse(MQTTCodec.java:138)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$4.parse(MQTTCodec.java:217)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$3.parse(MQTTCodec.java:179)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec$2.parse(MQTTCodec.java:138)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:76)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport.processBuffer(MQTTNIOTransport.java:132)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:120)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:72)[activemq-mqtt-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:98)[activemq-client-5.13.1.jar:5.13.1]
        at
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:118)[activemq-client-5.13.1.jar:5.13.1]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_95]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_95]
        at java.lang.Thread.run(Thread.java:745)[:1.7.0_95]

These exceptions make sense because I had killed the test client abruptly.

But after this, AMQ becomes virtually unusable! Further attempts to connect
to the broker and/or subscribe/unsubscribe to/from topics will always
timeout.

I took thread dumps of AMQ process and made following observations :
a) Most of the threads are waiting to acquire read lock on
org.apache.activemq.broker.region.AbstractRegion's destinationsLock
b) The method
org.apache.activemq.broker.region.AbstractRegion.addDestination which had
acquired the write lock did not finish even after 5 minutes! (Check
"ActiveMQ NIO Worker 1323" in ThreadDump1.log and ThreadDump2.log)
c) The method org.apache.activemq.broker.region.TopicRegion.addConsumer
which internally calls the addDestination method mentioned above did not
complete even after 30 minutes!! (Check same thread in ThreadDump2.log and
ThreadDump3.log)

ThreadDump1.log
<http://activemq.2283324.n4.nabble.com/file/n4709985/ThreadDump1.log>  
ThreadDump2.log
<http://activemq.2283324.n4.nabble.com/file/n4709985/ThreadDump2.log>  
ThreadDump3.log
<http://activemq.2283324.n4.nabble.com/file/n4709985/ThreadDump3.log>  

PS : 
a) We have disabled dedicated task runner by setting
-Dorg.apache.activemq.UseDedicatedTaskRunner=false
b) We use KahaDB as persistent store and all messages are set to QOS 1.

I feel I am not using the right AMQ configurations, but I am not able to
figure out which config is missing/wrong.

Any help / pointers would be greatly appreciated!

Thanks,
Shobhana



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/ActiveMQ-with-KahaDB-as-persistent-store-becomes-very-slow-almost-unresponsive-after-creating-large-s-tp4709985.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to