Its works! Thank you very much! i have added transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
is there any caveats? possible problems? On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali wrote: > > It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? > > 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <[email protected] > <javascript:>>: > > Hi! > > > > I am trying to implement RPC through RabbitMQ, described here > > http://www.rabbitmq.com/tutorials/tutorial-six-python.html > > > > With pika and twisted i getting 1000 RPC calls per second, similar > > implementation on asyncio yields only 25 RPC calls per second. > > Asyncio adapter for pika i implement myself - it is just port of Twisted > > adapter - > > > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py > > > > > RPC server implementation: > > > > > > import sys > > import time > > import asyncio > > import pika > > > > from pika.adapters import asyncio_connection > > > > > > @asyncio.coroutine > > def make_connection(loop, host="localhost", port=5672): > > > > def connection_factory(): > > params = pika.ConnectionParameters() > > return asyncio_connection.AsyncioProtocolConnection(params, > > loop=loop) > > > > transport, connection = yield from > > loop.create_connection(connection_factory, host, port) > > yield from connection.ready > > return connection > > > > > > @asyncio.coroutine > > def server(loop): > > conn = yield from make_connection(loop) > > chan = yield from conn.channel() > > > > conn2 = yield from make_connection(loop) > > chan2 = yield from conn2.channel() > > print('Channel', chan) > > yield from chan.queue_declare(queue='rpc_queue') > > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', > > no_ack=True) > > > > while True: > > ch, method, props, body = yield from queue.get() > > # print('Get', body, props) > > yield from chan2.basic_publish( > > exchange='', > > routing_key=props.reply_to, > > body=body[::-1]) > > > > > > loop = asyncio.get_event_loop() > > > > try: > > task = asyncio.ensure_future(server(loop)) > > loop.run_until_complete(task) > > except KeyboardInterrupt: > > print('Done') > > > > RPC client implementation: > > > > > > > > import sys > > import time > > import asyncio > > import pika > > > > from pika.adapters import asyncio_connection > > > > > > class Counter: > > > > def __init__(self, name=''): > > self.start = time.time() > > self.cnt = 0 > > self.name = name > > > > def reset(self): > > self.start = time.time() > > self.cnt = 0 > > > > def inc(self, value=1): > > self.cnt += value > > > > def summary(self): > > now = time.time() > > return { > > 'time': now - self.start, > > 'count': self.cnt, > > 'rate': self.cnt / (now - self.start) > > } > > > > def __str__(self): > > return '{name}: time: {time}, count: {count}, rate: > {rate}'.format( > > name=self.name, **self.summary()) > > > > > > c = Counter('Counter') > > > > > > @asyncio.coroutine > > def make_connection(loop, host="localhost", port=5672): > > > > def connection_factory(): > > params = pika.ConnectionParameters() > > return asyncio_connection.AsyncioProtocolConnection(params, > > loop=loop) > > > > transport, connection = yield from > > loop.create_connection(connection_factory, host, port) > > yield from connection.ready > > return connection > > > > > > @asyncio.coroutine > > def call(loop): > > global c > > conn = yield from make_connection(loop) > > chan = yield from conn.channel() > > > > print('Channel', chan) > > result = yield from chan.queue_declare(exclusive=True) > > cb_queue = result.method.queue > > print('CBQ', cb_queue) > > queue, ctag = yield from chan.basic_consume(queue=cb_queue, > no_ack=True) > > c.reset() > > > > while True: > > yield from chan.basic_publish( > > exchange='', > > routing_key='rpc_queue', > > properties=pika.BasicProperties( > > reply_to=cb_queue, > > ), > > body='Hello World!') > > > > ch, method, props, body = yield from queue.get() > > c.inc() > > > > loop = asyncio.get_event_loop() > > > > try: > > task = asyncio.ensure_future(call(loop)) > > loop.run_until_complete(task) > > except KeyboardInterrupt: > > print('Done\n', c) > > > > > > all sources (with twisted-based implementation) are here > > https://github.com/appetito/pika_tests > > > > I can speedup asyncio RPC implementation by adding second connection to > > RabbitMQ - to use one connection for reading and another connection for > > writing, but i don't like this decision. > > > > What i am doing wrong? > > > > P.S. > > i also trying to use https://github.com/polyconseil/aioamqp - with the > same > > result > > > > >
