Hi! I am trying to implement RPC through RabbitMQ, described here http://www.rabbitmq.com/tutorials/tutorial-six-python.html
With pika and twisted i getting 1000 RPC calls per second, similar implementation on asyncio yields only 25 RPC calls per second. Asyncio adapter for pika i implement myself - it is just port of Twisted adapter - https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py RPC server implementation: import sys import time import asyncio import pika from pika.adapters import asyncio_connection @asyncio.coroutine def make_connection(loop, host="localhost", port=5672): def connection_factory(): params = pika.ConnectionParameters() return asyncio_connection.AsyncioProtocolConnection(params, loop=loop) transport, connection = yield from loop.create_connection(connection_factory, host, port) yield from connection.ready return connection @asyncio.coroutine def server(loop): conn = yield from make_connection(loop) chan = yield from conn.channel() conn2 = yield from make_connection(loop) chan2 = yield from conn2.channel() print('Channel', chan) yield from chan.queue_declare(queue='rpc_queue') queue, ctag = yield from chan.basic_consume(queue='rpc_queue', no_ack=True) while True: ch, method, props, body = yield from queue.get() # print('Get', body, props) yield from chan2.basic_publish( exchange='', routing_key=props.reply_to, body=body[::-1]) loop = asyncio.get_event_loop() try: task = asyncio.ensure_future(server(loop)) loop.run_until_complete(task) except KeyboardInterrupt: print('Done') RPC client implementation: import sys import time import asyncio import pika from pika.adapters import asyncio_connection class Counter: def __init__(self, name=''): self.start = time.time() self.cnt = 0 self.name = name def reset(self): self.start = time.time() self.cnt = 0 def inc(self, value=1): self.cnt += value def summary(self): now = time.time() return { 'time': now - self.start, 'count': self.cnt, 'rate': self.cnt / (now - self.start) } def __str__(self): return '{name}: time: {time}, count: {count}, rate: {rate}'.format( name=self.name, **self.summary()) c = Counter('Counter') @asyncio.coroutine def make_connection(loop, host="localhost", port=5672): def connection_factory(): params = pika.ConnectionParameters() return asyncio_connection.AsyncioProtocolConnection(params, loop=loop) transport, connection = yield from loop.create_connection(connection_factory, host, port) yield from connection.ready return connection @asyncio.coroutine def call(loop): global c conn = yield from make_connection(loop) chan = yield from conn.channel() print('Channel', chan) result = yield from chan.queue_declare(exclusive=True) cb_queue = result.method.queue print('CBQ', cb_queue) queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True) c.reset() while True: yield from chan.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=cb_queue, ), body='Hello World!') ch, method, props, body = yield from queue.get() c.inc() loop = asyncio.get_event_loop() try: task = asyncio.ensure_future(call(loop)) loop.run_until_complete(task) except KeyboardInterrupt: print('Done\n', c) all sources (with twisted-based implementation) are here https://github.com/appetito/pika_tests I can speedup asyncio RPC implementation by adding second connection to RabbitMQ - to use one connection for reading and another connection for writing, but i don't like this decision. What i am doing wrong? P.S. i also trying to use https://github.com/polyconseil/aioamqp - with the same result
