Re: [python-tulip] RabbitMQ RPC problem with asyncio
On 2016-01-18 2:50 PM, Charles-François Natali wrote: No caveats, actually I think this should be the default (that's the asyncio will set it by default in 3.6. Yury
Re: [python-tulip] RabbitMQ RPC problem with asyncio
What performance do you get after setting TCP_NODELAY? On Monday, January 18, 2016 at 3:11:33 PM UTC+3, Yuriy Homyakov wrote: > > Its works! Thank you very much! > > i have added > transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) > > is there any caveats? possible problems? > > On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali > wrote: >> >> It's just a random guess, but does asyncio set TCP_NODELAY on the >> sockets? >> >> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov: >> > Hi! >> > >> > I am trying to implement RPC through RabbitMQ, described here >> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html >> > >> > With pika and twisted i getting 1000 RPC calls per second, similar >> > implementation on asyncio yields only 25 RPC calls per second. >> > Asyncio adapter for pika i implement myself - it is just port of >> Twisted >> > adapter - >> > >> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py >> >> > >> > RPC server implementation: >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def server(loop): >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > conn2 = yield from make_connection(loop) >> > chan2 = yield from conn2.channel() >> > print('Channel', chan) >> > yield from chan.queue_declare(queue='rpc_queue') >> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', >> > no_ack=True) >> > >> > while True: >> > ch, method, props, body = yield from queue.get() >> > # print('Get', body, props) >> > yield from chan2.basic_publish( >> > exchange='', >> > routing_key=props.reply_to, >> > body=body[::-1]) >> > >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> > task = asyncio.ensure_future(server(loop)) >> > loop.run_until_complete(task) >> > except KeyboardInterrupt: >> > print('Done') >> > >> > RPC client implementation: >> > >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > class Counter: >> > >> > def __init__(self, name=''): >> > self.start = time.time() >> > self.cnt = 0 >> > self.name = name >> > >> > def reset(self): >> > self.start = time.time() >> > self.cnt = 0 >> > >> > def inc(self, value=1): >> > self.cnt += value >> > >> > def summary(self): >> > now = time.time() >> > return { >> > 'time': now - self.start, >> > 'count': self.cnt, >> > 'rate': self.cnt / (now - self.start) >> > } >> > >> > def __str__(self): >> > return '{name}: time: {time}, count: {count}, rate: >> {rate}'.format( >> > name=self.name, **self.summary()) >> > >> > >> > c = Counter('Counter') >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def call(loop): >> > global c >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > print('Channel', chan) >> > result = yield from chan.queue_declare(exclusive=True) >> > cb_queue = result.method.queue >> > print('CBQ', cb_queue) >> > queue, ctag = yield from chan.basic_consume(queue=cb_queue, >> no_ack=True) >> > c.reset() >> > >> > while True: >> > yield from chan.basic_publish( >> > exchange='', >> > routing_key='rpc_queue', >> > properties=pika.BasicProperties( >> > reply_to=cb_queue, >> > ), >> > body='Hello World!') >> > >> > ch, method, props, body = yield from queue.get() >> > c.inc() >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> >
Re: [python-tulip] RabbitMQ RPC problem with asyncio
It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov: > Hi! > > I am trying to implement RPC through RabbitMQ, described here > http://www.rabbitmq.com/tutorials/tutorial-six-python.html > > With pika and twisted i getting 1000 RPC calls per second, similar > implementation on asyncio yields only 25 RPC calls per second. > Asyncio adapter for pika i implement myself - it is just port of Twisted > adapter - > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py > > RPC server implementation: > > > import sys > import time > import asyncio > import pika > > from pika.adapters import asyncio_connection > > > @asyncio.coroutine > def make_connection(loop, host="localhost", port=5672): > > def connection_factory(): > params = pika.ConnectionParameters() > return asyncio_connection.AsyncioProtocolConnection(params, > loop=loop) > > transport, connection = yield from > loop.create_connection(connection_factory, host, port) > yield from connection.ready > return connection > > > @asyncio.coroutine > def server(loop): > conn = yield from make_connection(loop) > chan = yield from conn.channel() > > conn2 = yield from make_connection(loop) > chan2 = yield from conn2.channel() > print('Channel', chan) > yield from chan.queue_declare(queue='rpc_queue') > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', > no_ack=True) > > while True: > ch, method, props, body = yield from queue.get() > # print('Get', body, props) > yield from chan2.basic_publish( > exchange='', > routing_key=props.reply_to, > body=body[::-1]) > > > loop = asyncio.get_event_loop() > > try: > task = asyncio.ensure_future(server(loop)) > loop.run_until_complete(task) > except KeyboardInterrupt: > print('Done') > > RPC client implementation: > > > > import sys > import time > import asyncio > import pika > > from pika.adapters import asyncio_connection > > > class Counter: > > def __init__(self, name=''): > self.start = time.time() > self.cnt = 0 > self.name = name > > def reset(self): > self.start = time.time() > self.cnt = 0 > > def inc(self, value=1): > self.cnt += value > > def summary(self): > now = time.time() > return { > 'time': now - self.start, > 'count': self.cnt, > 'rate': self.cnt / (now - self.start) > } > > def __str__(self): > return '{name}: time: {time}, count: {count}, rate: {rate}'.format( > name=self.name, **self.summary()) > > > c = Counter('Counter') > > > @asyncio.coroutine > def make_connection(loop, host="localhost", port=5672): > > def connection_factory(): > params = pika.ConnectionParameters() > return asyncio_connection.AsyncioProtocolConnection(params, > loop=loop) > > transport, connection = yield from > loop.create_connection(connection_factory, host, port) > yield from connection.ready > return connection > > > @asyncio.coroutine > def call(loop): > global c > conn = yield from make_connection(loop) > chan = yield from conn.channel() > > print('Channel', chan) > result = yield from chan.queue_declare(exclusive=True) > cb_queue = result.method.queue > print('CBQ', cb_queue) > queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True) > c.reset() > > while True: > yield from chan.basic_publish( > exchange='', > routing_key='rpc_queue', > properties=pika.BasicProperties( > reply_to=cb_queue, > ), > body='Hello World!') > > ch, method, props, body = yield from queue.get() > c.inc() > > loop = asyncio.get_event_loop() > > try: > task = asyncio.ensure_future(call(loop)) > loop.run_until_complete(task) > except KeyboardInterrupt: > print('Done\n', c) > > > all sources (with twisted-based implementation) are here > https://github.com/appetito/pika_tests > > I can speedup asyncio RPC implementation by adding second connection to > RabbitMQ - to use one connection for reading and another connection for > writing, but i don't like this decision. > > What i am doing wrong? > > P.S. > i also trying to use https://github.com/polyconseil/aioamqp - with the same > result > >
Re: [python-tulip] RabbitMQ RPC problem with asyncio
Its works! Thank you very much! i have added transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) is there any caveats? possible problems? On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali wrote: > > It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? > > 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov>: > > Hi! > > > > I am trying to implement RPC through RabbitMQ, described here > > http://www.rabbitmq.com/tutorials/tutorial-six-python.html > > > > With pika and twisted i getting 1000 RPC calls per second, similar > > implementation on asyncio yields only 25 RPC calls per second. > > Asyncio adapter for pika i implement myself - it is just port of Twisted > > adapter - > > > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py > > > > > RPC server implementation: > > > > > > import sys > > import time > > import asyncio > > import pika > > > > from pika.adapters import asyncio_connection > > > > > > @asyncio.coroutine > > def make_connection(loop, host="localhost", port=5672): > > > > def connection_factory(): > > params = pika.ConnectionParameters() > > return asyncio_connection.AsyncioProtocolConnection(params, > > loop=loop) > > > > transport, connection = yield from > > loop.create_connection(connection_factory, host, port) > > yield from connection.ready > > return connection > > > > > > @asyncio.coroutine > > def server(loop): > > conn = yield from make_connection(loop) > > chan = yield from conn.channel() > > > > conn2 = yield from make_connection(loop) > > chan2 = yield from conn2.channel() > > print('Channel', chan) > > yield from chan.queue_declare(queue='rpc_queue') > > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', > > no_ack=True) > > > > while True: > > ch, method, props, body = yield from queue.get() > > # print('Get', body, props) > > yield from chan2.basic_publish( > > exchange='', > > routing_key=props.reply_to, > > body=body[::-1]) > > > > > > loop = asyncio.get_event_loop() > > > > try: > > task = asyncio.ensure_future(server(loop)) > > loop.run_until_complete(task) > > except KeyboardInterrupt: > > print('Done') > > > > RPC client implementation: > > > > > > > > import sys > > import time > > import asyncio > > import pika > > > > from pika.adapters import asyncio_connection > > > > > > class Counter: > > > > def __init__(self, name=''): > > self.start = time.time() > > self.cnt = 0 > > self.name = name > > > > def reset(self): > > self.start = time.time() > > self.cnt = 0 > > > > def inc(self, value=1): > > self.cnt += value > > > > def summary(self): > > now = time.time() > > return { > > 'time': now - self.start, > > 'count': self.cnt, > > 'rate': self.cnt / (now - self.start) > > } > > > > def __str__(self): > > return '{name}: time: {time}, count: {count}, rate: > {rate}'.format( > > name=self.name, **self.summary()) > > > > > > c = Counter('Counter') > > > > > > @asyncio.coroutine > > def make_connection(loop, host="localhost", port=5672): > > > > def connection_factory(): > > params = pika.ConnectionParameters() > > return asyncio_connection.AsyncioProtocolConnection(params, > > loop=loop) > > > > transport, connection = yield from > > loop.create_connection(connection_factory, host, port) > > yield from connection.ready > > return connection > > > > > > @asyncio.coroutine > > def call(loop): > > global c > > conn = yield from make_connection(loop) > > chan = yield from conn.channel() > > > > print('Channel', chan) > > result = yield from chan.queue_declare(exclusive=True) > > cb_queue = result.method.queue > > print('CBQ', cb_queue) > > queue, ctag = yield from chan.basic_consume(queue=cb_queue, > no_ack=True) > > c.reset() > > > > while True: > > yield from chan.basic_publish( > > exchange='', > > routing_key='rpc_queue', > > properties=pika.BasicProperties( > > reply_to=cb_queue, > > ), > > body='Hello World!') > > > > ch, method, props, body = yield from queue.get() > > c.inc() > > > > loop = asyncio.get_event_loop() > > > > try: > > task = asyncio.ensure_future(call(loop)) > > loop.run_until_complete(task) > > except KeyboardInterrupt: > > print('Done\n', c) > > > > > > all sources (with twisted-based implementation) are here > > https://github.com/appetito/pika_tests > > > > I can speedup asyncio RPC
Re: [python-tulip] RabbitMQ RPC problem with asyncio
No caveats, actually I think this should be the default (that's the case for example for all TCP sockets in Go, zmq also sets it, and all web browers). 2016-01-18 12:11 GMT+00:00 Yuriy Homyakov: > Its works! Thank you very much! > > i have added > transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) > > is there any caveats? possible problems? > > On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali > wrote: >> >> It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? >> >> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov : >> > Hi! >> > >> > I am trying to implement RPC through RabbitMQ, described here >> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html >> > >> > With pika and twisted i getting 1000 RPC calls per second, similar >> > implementation on asyncio yields only 25 RPC calls per second. >> > Asyncio adapter for pika i implement myself - it is just port of Twisted >> > adapter - >> > >> > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py >> > >> > RPC server implementation: >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def server(loop): >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > conn2 = yield from make_connection(loop) >> > chan2 = yield from conn2.channel() >> > print('Channel', chan) >> > yield from chan.queue_declare(queue='rpc_queue') >> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', >> > no_ack=True) >> > >> > while True: >> > ch, method, props, body = yield from queue.get() >> > # print('Get', body, props) >> > yield from chan2.basic_publish( >> > exchange='', >> > routing_key=props.reply_to, >> > body=body[::-1]) >> > >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> > task = asyncio.ensure_future(server(loop)) >> > loop.run_until_complete(task) >> > except KeyboardInterrupt: >> > print('Done') >> > >> > RPC client implementation: >> > >> > >> > >> > import sys >> > import time >> > import asyncio >> > import pika >> > >> > from pika.adapters import asyncio_connection >> > >> > >> > class Counter: >> > >> > def __init__(self, name=''): >> > self.start = time.time() >> > self.cnt = 0 >> > self.name = name >> > >> > def reset(self): >> > self.start = time.time() >> > self.cnt = 0 >> > >> > def inc(self, value=1): >> > self.cnt += value >> > >> > def summary(self): >> > now = time.time() >> > return { >> > 'time': now - self.start, >> > 'count': self.cnt, >> > 'rate': self.cnt / (now - self.start) >> > } >> > >> > def __str__(self): >> > return '{name}: time: {time}, count: {count}, rate: >> > {rate}'.format( >> > name=self.name, **self.summary()) >> > >> > >> > c = Counter('Counter') >> > >> > >> > @asyncio.coroutine >> > def make_connection(loop, host="localhost", port=5672): >> > >> > def connection_factory(): >> > params = pika.ConnectionParameters() >> > return asyncio_connection.AsyncioProtocolConnection(params, >> > loop=loop) >> > >> > transport, connection = yield from >> > loop.create_connection(connection_factory, host, port) >> > yield from connection.ready >> > return connection >> > >> > >> > @asyncio.coroutine >> > def call(loop): >> > global c >> > conn = yield from make_connection(loop) >> > chan = yield from conn.channel() >> > >> > print('Channel', chan) >> > result = yield from chan.queue_declare(exclusive=True) >> > cb_queue = result.method.queue >> > print('CBQ', cb_queue) >> > queue, ctag = yield from chan.basic_consume(queue=cb_queue, >> > no_ack=True) >> > c.reset() >> > >> > while True: >> > yield from chan.basic_publish( >> > exchange='', >> > routing_key='rpc_queue', >> > properties=pika.BasicProperties( >> > reply_to=cb_queue, >> > ), >> > body='Hello World!') >> > >> > ch, method, props, body = yield from queue.get() >> > c.inc() >> > >> > loop = asyncio.get_event_loop() >> > >> > try: >> > task = asyncio.ensure_future(call(loop)) >> >