What performance do you get after setting TCP_NODELAY?
On Monday, January 18, 2016 at 3:11:33 PM UTC+3, Yuriy Homyakov wrote: > > Its works! Thank you very much! > > i have added > transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) > > is there any caveats? possible problems? > > On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali > wrote: >> >> It's just a random guess, but does asyncio set TCP_NODELAY on the >> sockets? >> >> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <yuriy.h...@gmail.com>: >> > Hi! >> > >> > I am trying to implement RPC through RabbitMQ, described here >> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html >> > >> > With pika and twisted i getting 1000 RPC calls per second, similar >> > implementation on asyncio yields only 25 RPC calls per second. >> > Asyncio adapter for pika i implement myself - it is just port of >> Twisted >> > adapter - >> > >> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py >> >> > >> > RPC server implementation: >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def server(loop): >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > conn2 = yield from make_connection(loop) >> > chan2 = yield from conn2.channel() >> > print('Channel', chan) >> > yield from chan.queue_declare(queue='rpc_queue') >> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', >> > no_ack=True) >> > >> > while True: >> > ch, method, props, body = yield from queue.get() >> > # print('Get', body, props) >> > yield from chan2.basic_publish( >> > exchange='', >> > routing_key=props.reply_to, >> > body=body[::-1]) >> > >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> > task = asyncio.ensure_future(server(loop)) >> > loop.run_until_complete(task) >> > except KeyboardInterrupt: >> > print('Done') >> > >> > RPC client implementation: >> > >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > class Counter: >> > >> > def __init__(self, name=''): >> > self.start = time.time() >> > self.cnt = 0 >> > self.name = name >> > >> > def reset(self): >> > self.start = time.time() >> > self.cnt = 0 >> > >> > def inc(self, value=1): >> > self.cnt += value >> > >> > def summary(self): >> > now = time.time() >> > return { >> > 'time': now - self.start, >> > 'count': self.cnt, >> > 'rate': self.cnt / (now - self.start) >> > } >> > >> > def __str__(self): >> > return '{name}: time: {time}, count: {count}, rate: >> {rate}'.format( >> > name=self.name, **self.summary()) >> > >> > >> > c = Counter('Counter') >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def call(loop): >> > global c >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > print('Channel', chan) >> > result = yield from chan.queue_declare(exclusive=True) >> > cb_queue = result.method.queue >> > print('CBQ', cb_queue) >> > queue, ctag = yield from chan.basic_consume(queue=cb_queue, >> no_ack=True) >> > c.reset() >> > >> > while True: >> > yield from chan.basic_publish( >> > exchange='', >> > routing_key='rpc_queue', >> > properties=pika.BasicProperties( >> > reply_to=cb_queue, >> > ), >> > body='Hello World!') >> > >> > ch, method, props, body = yield from queue.get() >> > c.inc() >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> > task = asyncio.ensure_future(call(loop)) >> > loop.run_until_complete(task) >> > except KeyboardInterrupt: >> > print('Done\n', c) >> > >> > >> > all sources (with twisted-based implementation) are here >> > https://github.com/appetito/pika_tests >> > >> > I can speedup asyncio RPC implementation by adding second connection to >> > RabbitMQ - to use one connection for reading and another connection for >> > writing, but i don't like this decision. >> > >> > What i am doing wrong? >> > >> > P.S. >> > i also trying to use https://github.com/polyconseil/aioamqp - with the >> same >> > result >> > >> > >> >