On 14/05/18 20:13, Michael Ivanov wrote:
I am implementing message broadcasting using fanout exchange. Several processes are subscribed to exchange and each of them can send messages to this exchange. Is it possible to prevent the sender to receive it's own message?
In AMQP 0-10 there was a concept called no-local which would do what you describe. For AMQP 1.0 there is a filter defined to get that same functionality [1]. This is an extension to the actual specification, but using a designed extension point. It is also unfortunately a little cumbersome to use. Attached is a 1.0 example using the filter. I've only tested against the c++ broker, but I believe this filter is used by the JMS client also so it should work on the qpid java broker and likely even on activemq.
[1] https://github.com/apache/qpid/blob/trunk/qpid/specs/apache-filters.xml#L128
#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import print_function, unicode_literals import optparse from proton import Described, Message, symbol, Url from proton.reactor import Container, Filter from proton.handlers import MessagingHandler class Recv(MessagingHandler): def __init__(self, url, name): super(Recv, self).__init__() self.url = Url(url) self.name = name self.sent = 1 def on_start(self, event): event.container_id = self.name conn = event.container.connect(self.url) self.sender = event.container.create_sender(conn, self.url.path); self.container = event.container event.container.create_receiver(conn, self.url.path, options=Filter({symbol("foo"):Described(symbol("apache.org:no-local-filter:list"), "bar")})) event.container.schedule(5, self) def on_message(self, event): print('recv: %s' % event.message.body) def on_timer_task(self, event): message = '%s-%i' % (self.container.container_id, self.sent) self.sent += 1 self.sender.send(Message(body=message)) self.container.schedule(5, self) print('sent: %s' % message) parser = optparse.OptionParser(usage="usage: %prog [options]") parser.add_option("-a", "--address", default="localhost:5672/amq.fanout", help="address from which messages are received (default %default)") parser.add_option("-n", "--name", type="string", default=None, help="container id to use") opts, args = parser.parse_args() try: Container(Recv(opts.address, opts.name)).run() except KeyboardInterrupt: pass
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
