Hi,
I'm struggling with AMQ 5.9.0 to achieve my goals: durable virtual
topics with selectors on STOMP, with a network of four brokers
(connected with SSL connectors, with ACLs and certificate
authentication/authorization both on client and server side).
In english: I want to publish messages to a -more, but I think it's
irrelevant here- (virtual) topic from machines spread in many data
centers to AMQ servers in two DCs (2x2 machines, fully meshed). Any
publisher or consumer can connect to any of the servers.
The messages in the topic have several headers and I would like to
filter them into durable queues.
So for example Publisher1..10 publishes Type1..10 messages, but
Consumer1 only consuming Type1, Consumer2 consuming Type1-3 and so on
from durable queues.
To protect the queues, I would like to deliver only the messages
matching the consumer's selector, and publish the message with a TTL
set, so if the consumer for the given queue is away for an extender
period of time, the messages should be dropped.
Seems to be fun, but I can't get it to work.
The two behaviours I could get -so far with only one machine and only
one publisher/consumer (one queue):
- everything works nicely, the queue gets only the relevant messages,
but it's not durable. If there is a consumer, it gets the messages,
but if nobody listens, nothing gets to the queue.
- the queue gets all of the messages (not just the ones, the selector
would allow) and is durable. However, the consumer gets the messages
in bursts, like around 130 messages per second and nothing for about a
minute, then another 130 messages and nothing for a minute, while the
queue is full with messages.
The configuration I use is:
http://pastebin.com/d8rkB0Yc
The difference between the two, described above is the selectorAware
true setting, commented out in the pastebin config.
I use a python client, publish to /topic/VirtualTopic.radius and
consume from /queue/Consumer.radiusmq.VirtualTopic.radius with the
following code snippet:
conn.connect(headers={'client-id':'radiusmq'})
conn.subscribe(headers={
'destination':'/queue/Consumer.radiusmq.VirtualTopic.radius',
'ack':'client',
'id':1,
'activemq.prefetchSize':1000,
'selector':"Xsystem = 'wired' AND ("
"Xstatustype = 'STOP' OR "
"Xstatustype = 'INTERIM_UPDATE')",
}
)
I have some graphs about the latter case, if helps, however, I would
like to get the former working, but with a durable queue.
Thanks,