It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?
2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <[email protected]>: > Hi! > > I am trying to implement RPC through RabbitMQ, described here > http://www.rabbitmq.com/tutorials/tutorial-six-python.html > > With pika and twisted i getting 1000 RPC calls per second, similar > implementation on asyncio yields only 25 RPC calls per second. > Asyncio adapter for pika i implement myself - it is just port of Twisted > adapter - > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py > > RPC server implementation: > > > import sys > import time > import asyncio > import pika > > from pika.adapters import asyncio_connection > > > @asyncio.coroutine > def make_connection(loop, host="localhost", port=5672): > > def connection_factory(): > params = pika.ConnectionParameters() > return asyncio_connection.AsyncioProtocolConnection(params, > loop=loop) > > transport, connection = yield from > loop.create_connection(connection_factory, host, port) > yield from connection.ready > return connection > > > @asyncio.coroutine > def server(loop): > conn = yield from make_connection(loop) > chan = yield from conn.channel() > > conn2 = yield from make_connection(loop) > chan2 = yield from conn2.channel() > print('Channel', chan) > yield from chan.queue_declare(queue='rpc_queue') > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', > no_ack=True) > > while True: > ch, method, props, body = yield from queue.get() > # print('Get', body, props) > yield from chan2.basic_publish( > exchange='', > routing_key=props.reply_to, > body=body[::-1]) > > > loop = asyncio.get_event_loop() > > try: > task = asyncio.ensure_future(server(loop)) > loop.run_until_complete(task) > except KeyboardInterrupt: > print('Done') > > RPC client implementation: > > > > import sys > import time > import asyncio > import pika > > from pika.adapters import asyncio_connection > > > class Counter: > > def __init__(self, name=''): > self.start = time.time() > self.cnt = 0 > self.name = name > > def reset(self): > self.start = time.time() > self.cnt = 0 > > def inc(self, value=1): > self.cnt += value > > def summary(self): > now = time.time() > return { > 'time': now - self.start, > 'count': self.cnt, > 'rate': self.cnt / (now - self.start) > } > > def __str__(self): > return '{name}: time: {time}, count: {count}, rate: {rate}'.format( > name=self.name, **self.summary()) > > > c = Counter('Counter') > > > @asyncio.coroutine > def make_connection(loop, host="localhost", port=5672): > > def connection_factory(): > params = pika.ConnectionParameters() > return asyncio_connection.AsyncioProtocolConnection(params, > loop=loop) > > transport, connection = yield from > loop.create_connection(connection_factory, host, port) > yield from connection.ready > return connection > > > @asyncio.coroutine > def call(loop): > global c > conn = yield from make_connection(loop) > chan = yield from conn.channel() > > print('Channel', chan) > result = yield from chan.queue_declare(exclusive=True) > cb_queue = result.method.queue > print('CBQ', cb_queue) > queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True) > c.reset() > > while True: > yield from chan.basic_publish( > exchange='', > routing_key='rpc_queue', > properties=pika.BasicProperties( > reply_to=cb_queue, > ), > body='Hello World!') > > ch, method, props, body = yield from queue.get() > c.inc() > > loop = asyncio.get_event_loop() > > try: > task = asyncio.ensure_future(call(loop)) > loop.run_until_complete(task) > except KeyboardInterrupt: > print('Done\n', c) > > > all sources (with twisted-based implementation) are here > https://github.com/appetito/pika_tests > > I can speedup asyncio RPC implementation by adding second connection to > RabbitMQ - to use one connection for reading and another connection for > writing, but i don't like this decision. > > What i am doing wrong? > > P.S. > i also trying to use https://github.com/polyconseil/aioamqp - with the same > result > >
