Hi all,

I tried RPC API on Ryu3.6. The sample of VRRP was very helpful for me.
Thanks a lot! And I would like to feedback the noticed point.

Feedback:
I think that the layer of API is low for users. It is hard that a user
writes the same thing as rpc_manager.RpcVRRPManager.

Proposal:
I would like to propose API of a higher layer. The PoC is attached to this
mail. This API is used as follows.

----------
class SampleRpcApp(RpcAppMixin,  # This app uses RPC
                     app_manager.RyuApp):

    # RPC Server API (two way)
    @request('request_method_name')
    def _request_handler(self, params):
        return {}  # result

    # RPC Server API (one way)
    @notify('notify_method_name')
    def _notify_handler(self, src, params):
        pass  # one way

    ...
        # RPC Client API (two way)
        result = self.send_rpc_request('peer_name', 'method_name', params)
        # RPC Client API (one way)
        self.send_rpc_notify('peer_name', 'method_name', params)

----------
$ ryu-manager sample.py --rpc-listen-ports=SampleRpcApp:50000
----------

Since this idea and code are a concept, I ask for a comment.

Thanks,
Satoshi
import inspect
import socket
import logging

from oslo.config import cfg

from ryu.lib import rpc
from ryu.lib import hub
from ryu.lib.rpc import RPCError

CONF = cfg.CONF

CONF.register_cli_opts([
    cfg.DictOpt('rpc-listen-ports', default={},
               help='ports for rpc interface (AppName1:port1,AppName2:port2,...)')])

LOG = logging.getLogger(__name__)

def request(name):
    def _request(app_method):
        app_method.rpc_request_info = {
            'name': name,
        }
        return app_method
    return _request


def notify(name):
    def _notify(app_method):
        app_method.rpc_notify_info = {
            'name': name,
        }
        return app_method
    return _notify


class Peer(object):

    def __init__(self, name, request_q, response_q, notify_q):
        super(Peer, self).__init__()
        self.name = name
        self.endpoint = None
        self.request_q = request_q
        self.response_q = response_q
        self.notify_q = notify_q

    def _handle_request(self, data):
        self.request_q.put((self, data))

    def _handle_response(self, data):
        self.response_q.put((self, data))

    def _handle_notify(self, data):
        self.notify_q.put((self, data))

class RpcAppMixin(object):

    def __init__(self, *args, **kwargs):
        super(RpcAppMixin, self).__init__(*args, **kwargs)
        self._rpc_peers = {}
        self._rpc_requests = hub.Queue()
        self._rpc_notifies = hub.Queue()
        self._server_thread = hub.spawn(self._peer_accept_thread)
        self._request_thread = hub.spawn(self._rpc_request_loop_thread)
        self._notify_thread = hub.spawn(self._rpc_notify_loop_thread)
        self._rpc_request_callbacks = self._detect_rpc_callbacks('rpc_request_info')
        self._rpc_notify_callbacks = self._detect_rpc_callbacks('rpc_notify_info')

    def send_rpc_request(self, peer_name, method_name, params):
        peer = self._rpc_peers.get(peer_name)
        peer.endpoint.send_request(method_name, [params])
        return peer.response_q.get()

    def send_rpc_notify(self, peer_name, method_name, params):
        peer = self._rpc_peers.get(peer_name)
        peer.endpoint.send_notification(method_name, [params])

    def send_rpc_broadcast(self, method_name, params):
        for peer_name in self.rpc_peers:
            self.send_rpc_notify(peer_name, method_name, params)

    @property
    def rpc_peers(self):
        return self._rpc_peers.keys()

    def connect_rpc_peer(self, addr):
        new_sock = socket.create_connection(addr)
        self._peer_accept_handler(new_sock, addr)

    def _detect_rpc_callbacks(self, attr_name):
        methods = inspect.getmembers(self,
                                     lambda v: inspect.ismethod(v) and
                                     hasattr(v, attr_name))
        callbacks = {}
        for _, method in methods:
            rpc_request_info = getattr(method, attr_name)
            name = rpc_request_info['name']
            callbacks[name] = method
        return callbacks

    def _rpc_request_loop_thread(self):
        while True:
            (peer, data) = self._rpc_requests.get()
            msgid, target_method, params = data
            error = None
            result = None
            method = self._rpc_request_callbacks.get(target_method)
            if not method:
                error = 'Unknown method: %s' % (target_method)
                peer.endpoint.send_response(msgid, error=error, result=result)
                LOG.error(error)
                continue
            try:
                result = method(params)
            except RPCError as e:
                error = str(e)
                LOG.error(error)
            peer.endpoint.send_response(msgid, error=error, result=result)

    def _rpc_notify_loop_thread(self):
        while True:
            (peer, data) = self._rpc_notifies.get()
            target_method, params = data
            method = self._rpc_notify_callbacks.get(target_method)
            if not method:
                error = 'Unknown method: %s' % (target_method)
                LOG.error(error)
                continue
            try:
                method(peer, params)
            except RPCError as e:
                error = str(e)
                LOG.error(error)
                continue

    def _peer_accept_handler(self, new_sock, addr):
        name = '%s:%d' % (addr[0], addr[1])
        peer = Peer(name, self._rpc_requests, hub.Queue(), self._rpc_notifies)
        table = {
            rpc.MessageType.REQUEST: peer._handle_request,
            rpc.MessageType.RESPONSE: peer._handle_response,
            rpc.MessageType.NOTIFY: peer._handle_notify,
        }
        peer.endpoint = rpc.EndPoint(new_sock, disp_table=table)
        self._rpc_peers[name] = peer
        hub.spawn(self._peer_loop_thread, peer)

    def _peer_loop_thread(self, peer):
        peer.endpoint.serve()
        del self._rpc_peers[peer.name]

    def _peer_accept_thread(self):
        rpc_port = CONF.rpc_listen_ports.get(self.__class__.__name__)
        if not rpc_port:
            error = 'No rpc listen port for %s (--rpc-listen-ports)' % self.__class__.__name__
            LOG.info(error)
            return
        try:
            rpc_port = int(rpc_port)
        except ValueError:
            error = 'rpc listen port must be an integer: %s' % rpc_port
            LOG.info(error)
            return
        server = hub.StreamServer(('', rpc_port),
                                  self._peer_accept_handler)
        server.serve_forever()
------------------------------------------------------------------------------
Android apps run on BlackBerry 10
Introducing the new BlackBerry 10.2.1 Runtime for Android apps.
Now with support for Jelly Bean, Bluetooth, Mapview and more.
Get your Android app in front of a whole new audience.  Start now.
http://pubads.g.doubleclick.net/gampad/clk?id=124407151&iu=/4140/ostg.clktrk
_______________________________________________
Ryu-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to