Well, having "real" destinations come and go that quickly is definitely not a typical use case. Admittedly, I have not been following the conversation that closely, but you do seem to have done your homework. I assume temporary destinations wouldn't suit your needs. Have you considered something like Akka? If you have this kind of throughput, do you really need persistence? Is each message a precious commodity or is your application just extremely chatty? I'll try to dig through the conversation...
On Sunday, February 8, 2015, Kevin Burton <[email protected]> wrote: > I could probably make one.. > > I think it would just be a broker with leveldb config, inactive GC of about > 60 seconds, code which writes 1 message to each of about 5000 queues, > consumes them, then closes all consumers. > > After once the GC kicks in it will take a long time for ActiveMQ to GC all > the destinations. > > Once I get to a state where I have my code at least reliable/stable, I’ll > try to code up something. > > I’m wondering if I’m in anti-pattern territory though. If this is NOT a > use case which ActiveMQ services , then I should really expect a fix. > > But it seems that the performance of GCing destinations should be higher > than this. > > Seems you should be able to GC something like 100-500 destinations per > second. > > On Sun, Feb 8, 2015 at 5:22 PM, James Carman <[email protected] > <javascript:;>> > wrote: > > > Do you have an example project which will exhibit the issue reliably? > > > > On Sunday, February 8, 2015, Kevin Burton <[email protected] > <javascript:;>> wrote: > > > > > OK. Now I have proof that my theory was correct :) > > > > > > If you have a lot of inactive queues that need to be GCd.. these will > > block > > > consumers and producers and effectively shut down ActiveMQ during this > > > process. > > > > > > In my usage, we see 30 minutes of activity where ActiveMQ is > unresponsive > > > and effectively dead :-( > > > > > > I spent all of last week trying to work around this by allowing > ActiveMQ > > to > > > GC queues by releasing them more aggressively hoping to amortize this > > > process but I don’t think that fixed the issue. > > > > > > This is a report of a ‘stack report’ tool I wrote 5 or so years ago > which > > > we use internally. It runs jstack then builds a graph looking at the > > lock > > > IDs and then ranks them accordingly, sorting by hot spot descending. > > > > > > So here we have 13 threads, which have 12 inbound thread blocking them > (3 > > > distinct stack traces) > > > > > > Note the addConsumer and addProducer… so no new consumers can be > created > > > while this process happens. > > > > > > Is this mutex lock strictly needed? Perhaps one strategy is to have one > > > thread which does the removeDestination and then just an AtomicBoolean > > > flagging this reference as “gone” and pending removal. > > > > > > the main thing I’m worried about is that I’m the only one seeing this > > issue > > > … > > > > > > I don’t really understand why this is happening though and why it’s so > > > slow. > > > > > > I’m looking at my log right now and it has about 3200 queues that it’s > > > GCing at the moment. > > > > > > It looks like it’s able to do about 10 per minute. So obviously this > is > > > just going to take a long time. > > > > > > So I think my main idea is to move to an all memory ActiveMQ broker for > > > now. I can just get good at QUICKLY rebuilding the queue during total > > > queue failure. This is just a short term work around though.. > > > > > > I’ll either have to figure out away to fix this, completely redesign my > > app > > > (which will be no fun), build my own queue server with special > semantics, > > > or implement some sort of snapshotting support and logging. > > > > > > If I were to build this myself doing logs and checkpoints, while > keeping > > > the whole thing in memory, I think that would be faster. But of course > > > that would take more time :-( > > > > > > ------------- > > > Threads: 13 , Unique waiting threads: 3 , Total waiting threads: 12 > > > ------------- > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > org.fusesource.hawtdispatch.SettableFuture$class.await(Future.scala:71) > > > - locked <0xXXXXXXXXXXXXXXXX> (a > > > org.fusesource.hawtdispatch.SettableFuture$mutex$) > > > at > > > org.fusesource.hawtdispatch.Future$$anon$1.await(Future.scala:122) > > > at > > org.fusesource.hawtdispatch.Future$class.apply(Future.scala:28) > > > at > > > org.fusesource.hawtdispatch.Future$$anon$1.apply(Future.scala:122) > > > at > > > > > > > > > org.fusesource.hawtdispatch.package$RichExecutorTrait$class.sync(hawtdispatch.scala:106) > > > at > > > > > > > > > org.fusesource.hawtdispatch.package$RichExecutor.sync(hawtdispatch.scala:142) > > > at > > > > > > > > > org.apache.activemq.leveldb.DBManager.destroyQueueStore(DBManager.scala:769) > > > at > > > > > > > > > org.apache.activemq.leveldb.LevelDBStore$$anonfun$removeQueueMessageStore$1.apply(LevelDBStore.scala:588) > > > at > > > > > > > > > org.apache.activemq.leveldb.LevelDBStore$$anonfun$removeQueueMessageStore$1.apply(LevelDBStore.scala:587) > > > at scala.Option.foreach(Option.scala:245) > > > at > > > > > > > > > org.apache.activemq.leveldb.LevelDBStore.removeQueueMessageStore(LevelDBStore.scala:587) > > > - locked <0xXXXXXXXXXXXXXXXX> (a > > > org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter) > > > at > > > > > > > > > org.apache.activemq.broker.region.DestinationFactoryImpl.removeDestination(DestinationFactoryImpl.java:113) > > > at > > > > > > > > > org.apache.activemq.broker.region.AbstractRegion.dispose(AbstractRegion.java:592) > > > at > > > > > > > > > org.apache.activemq.broker.region.AbstractRegion.removeDestination(AbstractRegion.java:222) > > > at > > > > > > > > > org.apache.activemq.broker.jmx.ManagedQueueRegion.removeDestination(ManagedQueueRegion.java:62) > > > at > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.removeDestination(RegionBroker.java:340) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172) > > > at > > > > > > > > > org.apache.activemq.advisory.AdvisoryBroker.removeDestination(AdvisoryBroker.java:212) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeDestination(BrokerFilter.java:172) > > > at > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.removeDestination(MutableBrokerFilter.java:177) > > > at > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.purgeInactiveDestinations(RegionBroker.java:860) > > > at > > > > > > org.apache.activemq.broker.region.RegionBroker$1.run(RegionBroker.java:109) > > > at > > > > > > > > > org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33) > > > at java.util.TimerThread.mainLoop(Timer.java:555) > > > at java.util.TimerThread.run(Timer.java:505) > > > > > > Lockable ownable synchronizers: > > > - <0x00000005cdb4ac00> > > > - <0x00000005ce152b78> > > > > > > 5 waiting threads: > > > -------- > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000005ce152b78> (a > > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > > at > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282) > > > at > > > > > > > > > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731) > > > at > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:402) > > > at > > > > > > > > > org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244) > > > at > > > > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > > > at > > > > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > > > at > > > > > > > > > org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:101) > > > at > > > > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > > > at > > > > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > > > at > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:102) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:618) > > > at > > > org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148) > > > at > > > > > > > > > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > > > at > > > > > > > > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) > > > at > > > > > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270) > > > at > > > > > > > > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > > > at > > > > > > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) > > > at > > > > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > 1 waiting threads: > > > -------- > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000005ce152b78> (a > > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > > at > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282) > > > at > > > > > > > > > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731) > > > at > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.addProducer(RegionBroker.java:371) > > > at > > > > > > > > > org.apache.activemq.broker.jmx.ManagedRegionBroker.addProducer(ManagedRegionBroker.java:267) > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102) > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102) > > > at > > > > > > > > > org.apache.activemq.advisory.AdvisoryBroker.addProducer(AdvisoryBroker.java:172) > > > at > > > > > > > > > org.apache.activemq.broker.CompositeDestinationBroker.addProducer(CompositeDestinationBroker.java:56) > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:102) > > > at > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:107) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:565) > > > at > > > org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:108) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148) > > > at > > > > > > > > > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > > > at > > > > > > > > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) > > > at > > > > > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270) > > > at > > > > > > > > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > > > at > > > > > > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) > > > at > > > > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > 6 waiting threads: > > > -------- > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x00000005ce152b78> (a > > > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > > at > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282) > > > at > > > > > > > > > java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731) > > > at > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:413) > > > at > > > > > > > > > org.apache.activemq.broker.jmx.ManagedRegionBroker.removeConsumer(ManagedRegionBroker.java:262) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132) > > > at > > > > > > > > > org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:263) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132) > > > at > > > > > > > > > org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132) > > > at > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:137) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:650) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:689) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.processRemoveConnection(TransportConnection.java:801) > > > - locked <0xXXXXXXXXXXXXXXXX> (a > > > org.apache.activemq.broker.jmx.ManagedTransportConnection) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:1138) > > > at > > > > > > > > > org.apache.activemq.broker.TransportConnection$4.run(TransportConnection.java:1068) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > -- > > > > > > Founder/CEO Spinn3r.com > > > Location: *San Francisco, CA* > > > blog: http://burtonator.wordpress.com > > > … or check out my Google+ profile > > > <https://plus.google.com/102718274791889610666/posts> > > > <http://spinn3r.com> > > > > > > > > > -- > > Founder/CEO Spinn3r.com > Location: *San Francisco, CA* > blog: http://burtonator.wordpress.com > … or check out my Google+ profile > <https://plus.google.com/102718274791889610666/posts> > <http://spinn3r.com> >
