Re: [python-tulip] RabbitMQ RPC problem with asyncio

2016-11-23 Thread Yury Selivanov



On 2016-01-18 2:50 PM, Charles-François Natali wrote:

No caveats, actually I think this should be the default (that's the


asyncio will set it by default in 3.6.

Yury


Re: [python-tulip] RabbitMQ RPC problem with asyncio

2016-11-23 Thread Anton VV
What performance do you get after setting TCP_NODELAY?

On Monday, January 18, 2016 at 3:11:33 PM UTC+3, Yuriy Homyakov wrote:
>
> Its works! Thank you very much!
>
> i have added
> transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
>
> is there any caveats? possible problems? 
>
> On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali 
> wrote:
>>
>> It's just a random guess, but does asyncio set TCP_NODELAY on the 
>> sockets? 
>>
>> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov : 
>> > Hi! 
>> > 
>> > I am trying to implement RPC through RabbitMQ, described  here 
>> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html 
>> > 
>> > With pika and twisted i getting 1000 RPC calls per second, similar 
>> > implementation on asyncio yields only 25 RPC calls per second. 
>> > Asyncio adapter for pika i implement myself - it is just port of 
>> Twisted 
>> > adapter - 
>> > 
>> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>>  
>> > 
>> > RPC server implementation: 
>> > 
>> > 
>> > import sys 
>> > import time 
>> > import asyncio 
>> > import pika 
>> > 
>> > from pika.adapters import asyncio_connection 
>> > 
>> > 
>> > @asyncio.coroutine 
>> > def make_connection(loop, host="localhost", port=5672): 
>> > 
>> > def connection_factory(): 
>> > params = pika.ConnectionParameters() 
>> > return asyncio_connection.AsyncioProtocolConnection(params, 
>> > loop=loop) 
>> > 
>> > transport, connection = yield from 
>> > loop.create_connection(connection_factory, host, port) 
>> > yield from connection.ready 
>> > return connection 
>> > 
>> > 
>> > @asyncio.coroutine 
>> > def server(loop): 
>> > conn = yield from make_connection(loop) 
>> > chan = yield from conn.channel() 
>> > 
>> > conn2 = yield from make_connection(loop) 
>> > chan2 = yield from conn2.channel() 
>> > print('Channel', chan) 
>> > yield from chan.queue_declare(queue='rpc_queue') 
>> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', 
>> > no_ack=True) 
>> > 
>> > while True: 
>> > ch, method, props, body = yield from queue.get() 
>> > # print('Get', body, props) 
>> > yield from chan2.basic_publish( 
>> > exchange='', 
>> > routing_key=props.reply_to, 
>> > body=body[::-1]) 
>> > 
>> > 
>> > loop = asyncio.get_event_loop() 
>> > 
>> > try: 
>> > task = asyncio.ensure_future(server(loop)) 
>> > loop.run_until_complete(task) 
>> > except KeyboardInterrupt: 
>> > print('Done') 
>> > 
>> > RPC client implementation: 
>> > 
>> > 
>> > 
>> > import sys 
>> > import time 
>> > import asyncio 
>> > import pika 
>> > 
>> > from pika.adapters import asyncio_connection 
>> > 
>> > 
>> > class Counter: 
>> > 
>> > def __init__(self, name=''): 
>> > self.start = time.time() 
>> > self.cnt = 0 
>> > self.name = name 
>> > 
>> > def reset(self): 
>> > self.start = time.time() 
>> > self.cnt = 0 
>> > 
>> > def inc(self, value=1): 
>> > self.cnt += value 
>> > 
>> > def summary(self): 
>> > now = time.time() 
>> > return { 
>> > 'time': now - self.start, 
>> > 'count': self.cnt, 
>> > 'rate': self.cnt / (now - self.start) 
>> > } 
>> > 
>> > def __str__(self): 
>> > return '{name}: time: {time}, count: {count}, rate: 
>> {rate}'.format( 
>> > name=self.name, **self.summary()) 
>> > 
>> > 
>> > c = Counter('Counter') 
>> > 
>> > 
>> > @asyncio.coroutine 
>> > def make_connection(loop, host="localhost", port=5672): 
>> > 
>> > def connection_factory(): 
>> > params = pika.ConnectionParameters() 
>> > return asyncio_connection.AsyncioProtocolConnection(params, 
>> > loop=loop) 
>> > 
>> > transport, connection = yield from 
>> > loop.create_connection(connection_factory, host, port) 
>> > yield from connection.ready 
>> > return connection 
>> > 
>> > 
>> > @asyncio.coroutine 
>> > def call(loop): 
>> > global c 
>> > conn = yield from make_connection(loop) 
>> > chan = yield from conn.channel() 
>> > 
>> > print('Channel', chan) 
>> > result = yield from chan.queue_declare(exclusive=True) 
>> > cb_queue = result.method.queue 
>> > print('CBQ', cb_queue) 
>> > queue, ctag = yield from chan.basic_consume(queue=cb_queue, 
>> no_ack=True) 
>> > c.reset() 
>> > 
>> > while True: 
>> > yield from chan.basic_publish( 
>> > exchange='', 
>> > routing_key='rpc_queue', 
>> > properties=pika.BasicProperties( 
>> >  reply_to=cb_queue, 
>> >  ), 
>> > body='Hello World!') 
>> > 
>> > ch, method, props, body = yield from queue.get() 
>> > c.inc() 
>> > 
>> > loop = asyncio.get_event_loop() 
>> > 
>> > try: 
>> > 

Re: [python-tulip] RabbitMQ RPC problem with asyncio

2016-01-18 Thread Charles-François Natali
It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?

2016-01-18 7:33 GMT+00:00 Yuriy Homyakov :
> Hi!
>
> I am trying to implement RPC through RabbitMQ, described  here
> http://www.rabbitmq.com/tutorials/tutorial-six-python.html
>
> With pika and twisted i getting 1000 RPC calls per second, similar
> implementation on asyncio yields only 25 RPC calls per second.
> Asyncio adapter for pika i implement myself - it is just port of Twisted
> adapter -
> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>
> RPC server implementation:
>
>
> import sys
> import time
> import asyncio
> import pika
>
> from pika.adapters import asyncio_connection
>
>
> @asyncio.coroutine
> def make_connection(loop, host="localhost", port=5672):
>
> def connection_factory():
> params = pika.ConnectionParameters()
> return asyncio_connection.AsyncioProtocolConnection(params,
> loop=loop)
>
> transport, connection = yield from
> loop.create_connection(connection_factory, host, port)
> yield from connection.ready
> return connection
>
>
> @asyncio.coroutine
> def server(loop):
> conn = yield from make_connection(loop)
> chan = yield from conn.channel()
>
> conn2 = yield from make_connection(loop)
> chan2 = yield from conn2.channel()
> print('Channel', chan)
> yield from chan.queue_declare(queue='rpc_queue')
> queue, ctag = yield from chan.basic_consume(queue='rpc_queue',
> no_ack=True)
>
> while True:
> ch, method, props, body = yield from queue.get()
> # print('Get', body, props)
> yield from chan2.basic_publish(
> exchange='',
> routing_key=props.reply_to,
> body=body[::-1])
>
>
> loop = asyncio.get_event_loop()
>
> try:
> task = asyncio.ensure_future(server(loop))
> loop.run_until_complete(task)
> except KeyboardInterrupt:
> print('Done')
>
> RPC client implementation:
>
>
>
> import sys
> import time
> import asyncio
> import pika
>
> from pika.adapters import asyncio_connection
>
>
> class Counter:
>
> def __init__(self, name=''):
> self.start = time.time()
> self.cnt = 0
> self.name = name
>
> def reset(self):
> self.start = time.time()
> self.cnt = 0
>
> def inc(self, value=1):
> self.cnt += value
>
> def summary(self):
> now = time.time()
> return {
> 'time': now - self.start,
> 'count': self.cnt,
> 'rate': self.cnt / (now - self.start)
> }
>
> def __str__(self):
> return '{name}: time: {time}, count: {count}, rate: {rate}'.format(
> name=self.name, **self.summary())
>
>
> c = Counter('Counter')
>
>
> @asyncio.coroutine
> def make_connection(loop, host="localhost", port=5672):
>
> def connection_factory():
> params = pika.ConnectionParameters()
> return asyncio_connection.AsyncioProtocolConnection(params,
> loop=loop)
>
> transport, connection = yield from
> loop.create_connection(connection_factory, host, port)
> yield from connection.ready
> return connection
>
>
> @asyncio.coroutine
> def call(loop):
> global c
> conn = yield from make_connection(loop)
> chan = yield from conn.channel()
>
> print('Channel', chan)
> result = yield from chan.queue_declare(exclusive=True)
> cb_queue = result.method.queue
> print('CBQ', cb_queue)
> queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True)
> c.reset()
>
> while True:
> yield from chan.basic_publish(
> exchange='',
> routing_key='rpc_queue',
> properties=pika.BasicProperties(
>  reply_to=cb_queue,
>  ),
> body='Hello World!')
>
> ch, method, props, body = yield from queue.get()
> c.inc()
>
> loop = asyncio.get_event_loop()
>
> try:
> task = asyncio.ensure_future(call(loop))
> loop.run_until_complete(task)
> except KeyboardInterrupt:
> print('Done\n', c)
>
>
> all sources (with twisted-based implementation) are here
> https://github.com/appetito/pika_tests
>
> I can speedup asyncio RPC implementation by adding second connection to
> RabbitMQ - to use one connection for reading and another connection for
> writing, but i don't like this decision.
>
> What i am doing wrong?
>
> P.S.
> i also trying to use https://github.com/polyconseil/aioamqp - with the same
> result
>
>


Re: [python-tulip] RabbitMQ RPC problem with asyncio

2016-01-18 Thread Yuriy Homyakov
Its works! Thank you very much!

i have added
transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)

is there any caveats? possible problems? 

On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali 
wrote:
>
> It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? 
>
> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov  >: 
> > Hi! 
> > 
> > I am trying to implement RPC through RabbitMQ, described  here 
> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html 
> > 
> > With pika and twisted i getting 1000 RPC calls per second, similar 
> > implementation on asyncio yields only 25 RPC calls per second. 
> > Asyncio adapter for pika i implement myself - it is just port of Twisted 
> > adapter - 
> > 
> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>  
> > 
> > RPC server implementation: 
> > 
> > 
> > import sys 
> > import time 
> > import asyncio 
> > import pika 
> > 
> > from pika.adapters import asyncio_connection 
> > 
> > 
> > @asyncio.coroutine 
> > def make_connection(loop, host="localhost", port=5672): 
> > 
> > def connection_factory(): 
> > params = pika.ConnectionParameters() 
> > return asyncio_connection.AsyncioProtocolConnection(params, 
> > loop=loop) 
> > 
> > transport, connection = yield from 
> > loop.create_connection(connection_factory, host, port) 
> > yield from connection.ready 
> > return connection 
> > 
> > 
> > @asyncio.coroutine 
> > def server(loop): 
> > conn = yield from make_connection(loop) 
> > chan = yield from conn.channel() 
> > 
> > conn2 = yield from make_connection(loop) 
> > chan2 = yield from conn2.channel() 
> > print('Channel', chan) 
> > yield from chan.queue_declare(queue='rpc_queue') 
> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue', 
> > no_ack=True) 
> > 
> > while True: 
> > ch, method, props, body = yield from queue.get() 
> > # print('Get', body, props) 
> > yield from chan2.basic_publish( 
> > exchange='', 
> > routing_key=props.reply_to, 
> > body=body[::-1]) 
> > 
> > 
> > loop = asyncio.get_event_loop() 
> > 
> > try: 
> > task = asyncio.ensure_future(server(loop)) 
> > loop.run_until_complete(task) 
> > except KeyboardInterrupt: 
> > print('Done') 
> > 
> > RPC client implementation: 
> > 
> > 
> > 
> > import sys 
> > import time 
> > import asyncio 
> > import pika 
> > 
> > from pika.adapters import asyncio_connection 
> > 
> > 
> > class Counter: 
> > 
> > def __init__(self, name=''): 
> > self.start = time.time() 
> > self.cnt = 0 
> > self.name = name 
> > 
> > def reset(self): 
> > self.start = time.time() 
> > self.cnt = 0 
> > 
> > def inc(self, value=1): 
> > self.cnt += value 
> > 
> > def summary(self): 
> > now = time.time() 
> > return { 
> > 'time': now - self.start, 
> > 'count': self.cnt, 
> > 'rate': self.cnt / (now - self.start) 
> > } 
> > 
> > def __str__(self): 
> > return '{name}: time: {time}, count: {count}, rate: 
> {rate}'.format( 
> > name=self.name, **self.summary()) 
> > 
> > 
> > c = Counter('Counter') 
> > 
> > 
> > @asyncio.coroutine 
> > def make_connection(loop, host="localhost", port=5672): 
> > 
> > def connection_factory(): 
> > params = pika.ConnectionParameters() 
> > return asyncio_connection.AsyncioProtocolConnection(params, 
> > loop=loop) 
> > 
> > transport, connection = yield from 
> > loop.create_connection(connection_factory, host, port) 
> > yield from connection.ready 
> > return connection 
> > 
> > 
> > @asyncio.coroutine 
> > def call(loop): 
> > global c 
> > conn = yield from make_connection(loop) 
> > chan = yield from conn.channel() 
> > 
> > print('Channel', chan) 
> > result = yield from chan.queue_declare(exclusive=True) 
> > cb_queue = result.method.queue 
> > print('CBQ', cb_queue) 
> > queue, ctag = yield from chan.basic_consume(queue=cb_queue, 
> no_ack=True) 
> > c.reset() 
> > 
> > while True: 
> > yield from chan.basic_publish( 
> > exchange='', 
> > routing_key='rpc_queue', 
> > properties=pika.BasicProperties( 
> >  reply_to=cb_queue, 
> >  ), 
> > body='Hello World!') 
> > 
> > ch, method, props, body = yield from queue.get() 
> > c.inc() 
> > 
> > loop = asyncio.get_event_loop() 
> > 
> > try: 
> > task = asyncio.ensure_future(call(loop)) 
> > loop.run_until_complete(task) 
> > except KeyboardInterrupt: 
> > print('Done\n', c) 
> > 
> > 
> > all sources (with twisted-based implementation) are here 
> > https://github.com/appetito/pika_tests 
> > 
> > I can speedup asyncio RPC 

Re: [python-tulip] RabbitMQ RPC problem with asyncio

2016-01-18 Thread Charles-François Natali
No caveats, actually I think this should be the default (that's the
case for example for all TCP sockets in Go, zmq also sets it, and all
web browers).



2016-01-18 12:11 GMT+00:00 Yuriy Homyakov :
> Its works! Thank you very much!
>
> i have added
> transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
>
> is there any caveats? possible problems?
>
> On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali
> wrote:
>>
>> It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?
>>
>> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov :
>> > Hi!
>> >
>> > I am trying to implement RPC through RabbitMQ, described  here
>> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html
>> >
>> > With pika and twisted i getting 1000 RPC calls per second, similar
>> > implementation on asyncio yields only 25 RPC calls per second.
>> > Asyncio adapter for pika i implement myself - it is just port of Twisted
>> > adapter -
>> >
>> > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>> >
>> > RPC server implementation:
>> >
>> >
>> > import sys
>> > import time
>> > import asyncio
>> > import pika
>> >
>> > from pika.adapters import asyncio_connection
>> >
>> >
>> > @asyncio.coroutine
>> > def make_connection(loop, host="localhost", port=5672):
>> >
>> > def connection_factory():
>> > params = pika.ConnectionParameters()
>> > return asyncio_connection.AsyncioProtocolConnection(params,
>> > loop=loop)
>> >
>> > transport, connection = yield from
>> > loop.create_connection(connection_factory, host, port)
>> > yield from connection.ready
>> > return connection
>> >
>> >
>> > @asyncio.coroutine
>> > def server(loop):
>> > conn = yield from make_connection(loop)
>> > chan = yield from conn.channel()
>> >
>> > conn2 = yield from make_connection(loop)
>> > chan2 = yield from conn2.channel()
>> > print('Channel', chan)
>> > yield from chan.queue_declare(queue='rpc_queue')
>> > queue, ctag = yield from chan.basic_consume(queue='rpc_queue',
>> > no_ack=True)
>> >
>> > while True:
>> > ch, method, props, body = yield from queue.get()
>> > # print('Get', body, props)
>> > yield from chan2.basic_publish(
>> > exchange='',
>> > routing_key=props.reply_to,
>> > body=body[::-1])
>> >
>> >
>> > loop = asyncio.get_event_loop()
>> >
>> > try:
>> > task = asyncio.ensure_future(server(loop))
>> > loop.run_until_complete(task)
>> > except KeyboardInterrupt:
>> > print('Done')
>> >
>> > RPC client implementation:
>> >
>> >
>> >
>> > import sys
>> > import time
>> > import asyncio
>> > import pika
>> >
>> > from pika.adapters import asyncio_connection
>> >
>> >
>> > class Counter:
>> >
>> > def __init__(self, name=''):
>> > self.start = time.time()
>> > self.cnt = 0
>> > self.name = name
>> >
>> > def reset(self):
>> > self.start = time.time()
>> > self.cnt = 0
>> >
>> > def inc(self, value=1):
>> > self.cnt += value
>> >
>> > def summary(self):
>> > now = time.time()
>> > return {
>> > 'time': now - self.start,
>> > 'count': self.cnt,
>> > 'rate': self.cnt / (now - self.start)
>> > }
>> >
>> > def __str__(self):
>> > return '{name}: time: {time}, count: {count}, rate:
>> > {rate}'.format(
>> > name=self.name, **self.summary())
>> >
>> >
>> > c = Counter('Counter')
>> >
>> >
>> > @asyncio.coroutine
>> > def make_connection(loop, host="localhost", port=5672):
>> >
>> > def connection_factory():
>> > params = pika.ConnectionParameters()
>> > return asyncio_connection.AsyncioProtocolConnection(params,
>> > loop=loop)
>> >
>> > transport, connection = yield from
>> > loop.create_connection(connection_factory, host, port)
>> > yield from connection.ready
>> > return connection
>> >
>> >
>> > @asyncio.coroutine
>> > def call(loop):
>> > global c
>> > conn = yield from make_connection(loop)
>> > chan = yield from conn.channel()
>> >
>> > print('Channel', chan)
>> > result = yield from chan.queue_declare(exclusive=True)
>> > cb_queue = result.method.queue
>> > print('CBQ', cb_queue)
>> > queue, ctag = yield from chan.basic_consume(queue=cb_queue,
>> > no_ack=True)
>> > c.reset()
>> >
>> > while True:
>> > yield from chan.basic_publish(
>> > exchange='',
>> > routing_key='rpc_queue',
>> > properties=pika.BasicProperties(
>> >  reply_to=cb_queue,
>> >  ),
>> > body='Hello World!')
>> >
>> > ch, method, props, body = yield from queue.get()
>> > c.inc()
>> >
>> > loop = asyncio.get_event_loop()
>> >
>> > try:
>> > task = asyncio.ensure_future(call(loop))
>> >