It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?

2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <[email protected]>:
> Hi!
>
> I am trying to implement RPC through RabbitMQ, described  here
> http://www.rabbitmq.com/tutorials/tutorial-six-python.html
>
> With pika and twisted i getting 1000 RPC calls per second, similar
> implementation on asyncio yields only 25 RPC calls per second.
> Asyncio adapter for pika i implement myself - it is just port of Twisted
> adapter -
> https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>
> RPC server implementation:
>
>
> import sys
> import time
> import asyncio
> import pika
>
> from pika.adapters import asyncio_connection
>
>
> @asyncio.coroutine
> def make_connection(loop, host="localhost", port=5672):
>
>     def connection_factory():
>         params = pika.ConnectionParameters()
>         return asyncio_connection.AsyncioProtocolConnection(params,
> loop=loop)
>
>     transport, connection = yield from
> loop.create_connection(connection_factory, host, port)
>     yield from connection.ready
>     return connection
>
>
> @asyncio.coroutine
> def server(loop):
>     conn = yield from make_connection(loop)
>     chan = yield from conn.channel()
>
>     conn2 = yield from make_connection(loop)
>     chan2 = yield from conn2.channel()
>     print('Channel', chan)
>     yield from chan.queue_declare(queue='rpc_queue')
>     queue, ctag = yield from chan.basic_consume(queue='rpc_queue',
> no_ack=True)
>
>     while True:
>         ch, method, props, body = yield from queue.get()
>         # print('Get', body, props)
>         yield from chan2.basic_publish(
>             exchange='',
>             routing_key=props.reply_to,
>             body=body[::-1])
>
>
> loop = asyncio.get_event_loop()
>
> try:
>     task = asyncio.ensure_future(server(loop))
>     loop.run_until_complete(task)
> except KeyboardInterrupt:
>     print('Done')
>
> RPC client implementation:
>
>
>
> import sys
> import time
> import asyncio
> import pika
>
> from pika.adapters import asyncio_connection
>
>
> class Counter:
>
>     def __init__(self, name=''):
>         self.start = time.time()
>         self.cnt = 0
>         self.name = name
>
>     def reset(self):
>         self.start = time.time()
>         self.cnt = 0
>
>     def inc(self, value=1):
>         self.cnt += value
>
>     def summary(self):
>         now = time.time()
>         return {
>             'time': now - self.start,
>             'count': self.cnt,
>             'rate': self.cnt / (now - self.start)
>         }
>
>     def __str__(self):
>         return '{name}: time: {time}, count: {count}, rate: {rate}'.format(
>             name=self.name, **self.summary())
>
>
> c = Counter('Counter')
>
>
> @asyncio.coroutine
> def make_connection(loop, host="localhost", port=5672):
>
>     def connection_factory():
>         params = pika.ConnectionParameters()
>         return asyncio_connection.AsyncioProtocolConnection(params,
> loop=loop)
>
>     transport, connection = yield from
> loop.create_connection(connection_factory, host, port)
>     yield from connection.ready
>     return connection
>
>
> @asyncio.coroutine
> def call(loop):
>     global c
>     conn = yield from make_connection(loop)
>     chan = yield from conn.channel()
>
>     print('Channel', chan)
>     result = yield from chan.queue_declare(exclusive=True)
>     cb_queue = result.method.queue
>     print('CBQ', cb_queue)
>     queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True)
>     c.reset()
>
>     while True:
>         yield from chan.basic_publish(
>             exchange='',
>             routing_key='rpc_queue',
>             properties=pika.BasicProperties(
>                  reply_to=cb_queue,
>                  ),
>             body='Hello World!')
>
>         ch, method, props, body = yield from queue.get()
>         c.inc()
>
> loop = asyncio.get_event_loop()
>
> try:
>     task = asyncio.ensure_future(call(loop))
>     loop.run_until_complete(task)
> except KeyboardInterrupt:
>     print('Done\n', c)
>
>
> all sources (with twisted-based implementation) are here
> https://github.com/appetito/pika_tests
>
> I can speedup asyncio RPC implementation by adding second connection to
> RabbitMQ - to use one connection for reading and another connection for
> writing, but i don't like this decision.
>
> What i am doing wrong?
>
> P.S.
> i also trying to use https://github.com/polyconseil/aioamqp - with the same
> result
>
>

Reply via email to