Its works! Thank you very much!

i have added
transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)

is there any caveats? possible problems? 

On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali 
wrote:
>
> It's just a random guess, but does asyncio set TCP_NODELAY on the sockets? 
>
> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <[email protected] 
> <javascript:>>: 
> > Hi! 
> > 
> > I am trying to implement RPC through RabbitMQ, described  here 
> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html 
> > 
> > With pika and twisted i getting 1000 RPC calls per second, similar 
> > implementation on asyncio yields only 25 RPC calls per second. 
> > Asyncio adapter for pika i implement myself - it is just port of Twisted 
> > adapter - 
> > 
> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>  
> > 
> > RPC server implementation: 
> > 
> > 
> > import sys 
> > import time 
> > import asyncio 
> > import pika 
> > 
> > from pika.adapters import asyncio_connection 
> > 
> > 
> > @asyncio.coroutine 
> > def make_connection(loop, host="localhost", port=5672): 
> > 
> >     def connection_factory(): 
> >         params = pika.ConnectionParameters() 
> >         return asyncio_connection.AsyncioProtocolConnection(params, 
> > loop=loop) 
> > 
> >     transport, connection = yield from 
> > loop.create_connection(connection_factory, host, port) 
> >     yield from connection.ready 
> >     return connection 
> > 
> > 
> > @asyncio.coroutine 
> > def server(loop): 
> >     conn = yield from make_connection(loop) 
> >     chan = yield from conn.channel() 
> > 
> >     conn2 = yield from make_connection(loop) 
> >     chan2 = yield from conn2.channel() 
> >     print('Channel', chan) 
> >     yield from chan.queue_declare(queue='rpc_queue') 
> >     queue, ctag = yield from chan.basic_consume(queue='rpc_queue', 
> > no_ack=True) 
> > 
> >     while True: 
> >         ch, method, props, body = yield from queue.get() 
> >         # print('Get', body, props) 
> >         yield from chan2.basic_publish( 
> >             exchange='', 
> >             routing_key=props.reply_to, 
> >             body=body[::-1]) 
> > 
> > 
> > loop = asyncio.get_event_loop() 
> > 
> > try: 
> >     task = asyncio.ensure_future(server(loop)) 
> >     loop.run_until_complete(task) 
> > except KeyboardInterrupt: 
> >     print('Done') 
> > 
> > RPC client implementation: 
> > 
> > 
> > 
> > import sys 
> > import time 
> > import asyncio 
> > import pika 
> > 
> > from pika.adapters import asyncio_connection 
> > 
> > 
> > class Counter: 
> > 
> >     def __init__(self, name=''): 
> >         self.start = time.time() 
> >         self.cnt = 0 
> >         self.name = name 
> > 
> >     def reset(self): 
> >         self.start = time.time() 
> >         self.cnt = 0 
> > 
> >     def inc(self, value=1): 
> >         self.cnt += value 
> > 
> >     def summary(self): 
> >         now = time.time() 
> >         return { 
> >             'time': now - self.start, 
> >             'count': self.cnt, 
> >             'rate': self.cnt / (now - self.start) 
> >         } 
> > 
> >     def __str__(self): 
> >         return '{name}: time: {time}, count: {count}, rate: 
> {rate}'.format( 
> >             name=self.name, **self.summary()) 
> > 
> > 
> > c = Counter('Counter') 
> > 
> > 
> > @asyncio.coroutine 
> > def make_connection(loop, host="localhost", port=5672): 
> > 
> >     def connection_factory(): 
> >         params = pika.ConnectionParameters() 
> >         return asyncio_connection.AsyncioProtocolConnection(params, 
> > loop=loop) 
> > 
> >     transport, connection = yield from 
> > loop.create_connection(connection_factory, host, port) 
> >     yield from connection.ready 
> >     return connection 
> > 
> > 
> > @asyncio.coroutine 
> > def call(loop): 
> >     global c 
> >     conn = yield from make_connection(loop) 
> >     chan = yield from conn.channel() 
> > 
> >     print('Channel', chan) 
> >     result = yield from chan.queue_declare(exclusive=True) 
> >     cb_queue = result.method.queue 
> >     print('CBQ', cb_queue) 
> >     queue, ctag = yield from chan.basic_consume(queue=cb_queue, 
> no_ack=True) 
> >     c.reset() 
> > 
> >     while True: 
> >         yield from chan.basic_publish( 
> >             exchange='', 
> >             routing_key='rpc_queue', 
> >             properties=pika.BasicProperties( 
> >                  reply_to=cb_queue, 
> >                  ), 
> >             body='Hello World!') 
> > 
> >         ch, method, props, body = yield from queue.get() 
> >         c.inc() 
> > 
> > loop = asyncio.get_event_loop() 
> > 
> > try: 
> >     task = asyncio.ensure_future(call(loop)) 
> >     loop.run_until_complete(task) 
> > except KeyboardInterrupt: 
> >     print('Done\n', c) 
> > 
> > 
> > all sources (with twisted-based implementation) are here 
> > https://github.com/appetito/pika_tests 
> > 
> > I can speedup asyncio RPC implementation by adding second connection to 
> > RabbitMQ - to use one connection for reading and another connection for 
> > writing, but i don't like this decision. 
> > 
> > What i am doing wrong? 
> > 
> > P.S. 
> > i also trying to use https://github.com/polyconseil/aioamqp - with the 
> same 
> > result 
> > 
> > 
>

Reply via email to