Hi!

I am trying to implement RPC through RabbitMQ, described 
 here http://www.rabbitmq.com/tutorials/tutorial-six-python.html

With pika and twisted i getting 1000 RPC calls per second, similar 
implementation on asyncio yields only 25 RPC calls per second.
Asyncio adapter for pika i implement myself - it is just port of Twisted 
adapter - 
https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py

RPC server implementation:


import sys
import time
import asyncio
import pika

from pika.adapters import asyncio_connection


@asyncio.coroutine
def make_connection(loop, host="localhost", port=5672):

    def connection_factory():
        params = pika.ConnectionParameters()
        return asyncio_connection.AsyncioProtocolConnection(params, 
loop=loop)

    transport, connection = yield from 
loop.create_connection(connection_factory, host, port)
    yield from connection.ready
    return connection


@asyncio.coroutine
def server(loop):
    conn = yield from make_connection(loop)
    chan = yield from conn.channel()

    conn2 = yield from make_connection(loop)
    chan2 = yield from conn2.channel()
    print('Channel', chan)
    yield from chan.queue_declare(queue='rpc_queue')
    queue, ctag = yield from chan.basic_consume(queue='rpc_queue', 
no_ack=True)

    while True:
        ch, method, props, body = yield from queue.get()
        # print('Get', body, props)
        yield from chan2.basic_publish(
            exchange='',
            routing_key=props.reply_to,
            body=body[::-1])


loop = asyncio.get_event_loop()

try:
    task = asyncio.ensure_future(server(loop))
    loop.run_until_complete(task)
except KeyboardInterrupt:
    print('Done')

RPC client implementation:



import sys
import time
import asyncio
import pika

from pika.adapters import asyncio_connection


class Counter:

    def __init__(self, name=''):
        self.start = time.time()
        self.cnt = 0
        self.name = name

    def reset(self):
        self.start = time.time()
        self.cnt = 0

    def inc(self, value=1):
        self.cnt += value

    def summary(self):
        now = time.time()
        return {
            'time': now - self.start,
            'count': self.cnt,
            'rate': self.cnt / (now - self.start)
        }

    def __str__(self):
        return '{name}: time: {time}, count: {count}, rate: {rate}'.format(
            name=self.name, **self.summary())


c = Counter('Counter')


@asyncio.coroutine
def make_connection(loop, host="localhost", port=5672):

    def connection_factory():
        params = pika.ConnectionParameters()
        return asyncio_connection.AsyncioProtocolConnection(params, 
loop=loop)

    transport, connection = yield from 
loop.create_connection(connection_factory, host, port)
    yield from connection.ready
    return connection


@asyncio.coroutine
def call(loop):
    global c
    conn = yield from make_connection(loop)
    chan = yield from conn.channel()

    print('Channel', chan)
    result = yield from chan.queue_declare(exclusive=True)
    cb_queue = result.method.queue
    print('CBQ', cb_queue)
    queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True)
    c.reset()

    while True:
        yield from chan.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                 reply_to=cb_queue,
                 ),
            body='Hello World!')

        ch, method, props, body = yield from queue.get()
        c.inc()

loop = asyncio.get_event_loop()

try:
    task = asyncio.ensure_future(call(loop))
    loop.run_until_complete(task)
except KeyboardInterrupt:
    print('Done\n', c)


all sources (with twisted-based implementation) are 
here https://github.com/appetito/pika_tests

I can speedup asyncio RPC implementation by adding second connection to 
RabbitMQ - to use one connection for reading and another connection for 
writing, but i don't like this decision.

What i am doing wrong?

P.S.
i also trying to use https://github.com/polyconseil/aioamqp - with the same 
result

 

Reply via email to