No caveats, actually I think this should be the default (that's the
case for example for all TCP sockets in Go, zmq also sets it, and all
web browers).



2016-01-18 12:11 GMT+00:00 Yuriy Homyakov <yuriy.homya...@gmail.com>:
> Its works! Thank you very much!
>
> i have added
> transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
>
> is there any caveats? possible problems?
>
> On Monday, January 18, 2016 at 12:06:19 PM UTC+3, Charles-François Natali
> wrote:
>>
>> It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?
>>
>> 2016-01-18 7:33 GMT+00:00 Yuriy Homyakov <yuriy.h...@gmail.com>:
>> > Hi!
>> >
>> > I am trying to implement RPC through RabbitMQ, described  here
>> > http://www.rabbitmq.com/tutorials/tutorial-six-python.html
>> >
>> > With pika and twisted i getting 1000 RPC calls per second, similar
>> > implementation on asyncio yields only 25 RPC calls per second.
>> > Asyncio adapter for pika i implement myself - it is just port of Twisted
>> > adapter -
>> >
>> > https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py
>> >
>> > RPC server implementation:
>> >
>> >
>> > import sys
>> > import time
>> > import asyncio
>> > import pika
>> >
>> > from pika.adapters import asyncio_connection
>> >
>> >
>> > @asyncio.coroutine
>> > def make_connection(loop, host="localhost", port=5672):
>> >
>> >     def connection_factory():
>> >         params = pika.ConnectionParameters()
>> >         return asyncio_connection.AsyncioProtocolConnection(params,
>> > loop=loop)
>> >
>> >     transport, connection = yield from
>> > loop.create_connection(connection_factory, host, port)
>> >     yield from connection.ready
>> >     return connection
>> >
>> >
>> > @asyncio.coroutine
>> > def server(loop):
>> >     conn = yield from make_connection(loop)
>> >     chan = yield from conn.channel()
>> >
>> >     conn2 = yield from make_connection(loop)
>> >     chan2 = yield from conn2.channel()
>> >     print('Channel', chan)
>> >     yield from chan.queue_declare(queue='rpc_queue')
>> >     queue, ctag = yield from chan.basic_consume(queue='rpc_queue',
>> > no_ack=True)
>> >
>> >     while True:
>> >         ch, method, props, body = yield from queue.get()
>> >         # print('Get', body, props)
>> >         yield from chan2.basic_publish(
>> >             exchange='',
>> >             routing_key=props.reply_to,
>> >             body=body[::-1])
>> >
>> >
>> > loop = asyncio.get_event_loop()
>> >
>> > try:
>> >     task = asyncio.ensure_future(server(loop))
>> >     loop.run_until_complete(task)
>> > except KeyboardInterrupt:
>> >     print('Done')
>> >
>> > RPC client implementation:
>> >
>> >
>> >
>> > import sys
>> > import time
>> > import asyncio
>> > import pika
>> >
>> > from pika.adapters import asyncio_connection
>> >
>> >
>> > class Counter:
>> >
>> >     def __init__(self, name=''):
>> >         self.start = time.time()
>> >         self.cnt = 0
>> >         self.name = name
>> >
>> >     def reset(self):
>> >         self.start = time.time()
>> >         self.cnt = 0
>> >
>> >     def inc(self, value=1):
>> >         self.cnt += value
>> >
>> >     def summary(self):
>> >         now = time.time()
>> >         return {
>> >             'time': now - self.start,
>> >             'count': self.cnt,
>> >             'rate': self.cnt / (now - self.start)
>> >         }
>> >
>> >     def __str__(self):
>> >         return '{name}: time: {time}, count: {count}, rate:
>> > {rate}'.format(
>> >             name=self.name, **self.summary())
>> >
>> >
>> > c = Counter('Counter')
>> >
>> >
>> > @asyncio.coroutine
>> > def make_connection(loop, host="localhost", port=5672):
>> >
>> >     def connection_factory():
>> >         params = pika.ConnectionParameters()
>> >         return asyncio_connection.AsyncioProtocolConnection(params,
>> > loop=loop)
>> >
>> >     transport, connection = yield from
>> > loop.create_connection(connection_factory, host, port)
>> >     yield from connection.ready
>> >     return connection
>> >
>> >
>> > @asyncio.coroutine
>> > def call(loop):
>> >     global c
>> >     conn = yield from make_connection(loop)
>> >     chan = yield from conn.channel()
>> >
>> >     print('Channel', chan)
>> >     result = yield from chan.queue_declare(exclusive=True)
>> >     cb_queue = result.method.queue
>> >     print('CBQ', cb_queue)
>> >     queue, ctag = yield from chan.basic_consume(queue=cb_queue,
>> > no_ack=True)
>> >     c.reset()
>> >
>> >     while True:
>> >         yield from chan.basic_publish(
>> >             exchange='',
>> >             routing_key='rpc_queue',
>> >             properties=pika.BasicProperties(
>> >                  reply_to=cb_queue,
>> >                  ),
>> >             body='Hello World!')
>> >
>> >         ch, method, props, body = yield from queue.get()
>> >         c.inc()
>> >
>> > loop = asyncio.get_event_loop()
>> >
>> > try:
>> >     task = asyncio.ensure_future(call(loop))
>> >     loop.run_until_complete(task)
>> > except KeyboardInterrupt:
>> >     print('Done\n', c)
>> >
>> >
>> > all sources (with twisted-based implementation) are here
>> > https://github.com/appetito/pika_tests
>> >
>> > I can speedup asyncio RPC implementation by adding second connection to
>> > RabbitMQ - to use one connection for reading and another connection for
>> > writing, but i don't like this decision.
>> >
>> > What i am doing wrong?
>> >
>> > P.S.
>> > i also trying to use https://github.com/polyconseil/aioamqp - with the
>> > same
>> > result
>> >
>> >

Reply via email to