https://pypi.python.org/pypi/tinyrpc/0.5 https://github.com/mbr/tinyrpc
Signed-off-by: Yoshihiro Kaneko <[email protected]> --- ryu/contrib/tinyrpc/__init__.py | 6 + ryu/contrib/tinyrpc/client.py | 91 ++++++++ ryu/contrib/tinyrpc/dispatch/__init__.py | 201 +++++++++++++++++ ryu/contrib/tinyrpc/exc.py | 40 ++++ ryu/contrib/tinyrpc/protocols/__init__.py | 173 +++++++++++++++ ryu/contrib/tinyrpc/protocols/jsonrpc.py | 291 +++++++++++++++++++++++++ ryu/contrib/tinyrpc/server/__init__.py | 71 ++++++ ryu/contrib/tinyrpc/server/gevent.py | 13 ++ ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py | 115 ++++++++++ ryu/contrib/tinyrpc/transports/__init__.py | 52 +++++ ryu/contrib/tinyrpc/transports/http.py | 31 +++ ryu/contrib/tinyrpc/transports/tcp.py | 52 +++++ ryu/contrib/tinyrpc/transports/wsgi.py | 90 ++++++++ ryu/contrib/tinyrpc/transports/zmq.py | 76 +++++++ tools/pip-requires | 1 - 15 files changed, 1302 insertions(+), 1 deletion(-) create mode 100644 ryu/contrib/tinyrpc/__init__.py create mode 100644 ryu/contrib/tinyrpc/client.py create mode 100644 ryu/contrib/tinyrpc/dispatch/__init__.py create mode 100644 ryu/contrib/tinyrpc/exc.py create mode 100644 ryu/contrib/tinyrpc/protocols/__init__.py create mode 100644 ryu/contrib/tinyrpc/protocols/jsonrpc.py create mode 100644 ryu/contrib/tinyrpc/server/__init__.py create mode 100644 ryu/contrib/tinyrpc/server/gevent.py create mode 100644 ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py create mode 100644 ryu/contrib/tinyrpc/transports/__init__.py create mode 100644 ryu/contrib/tinyrpc/transports/http.py create mode 100644 ryu/contrib/tinyrpc/transports/tcp.py create mode 100644 ryu/contrib/tinyrpc/transports/wsgi.py create mode 100644 ryu/contrib/tinyrpc/transports/zmq.py diff --git a/ryu/contrib/tinyrpc/__init__.py b/ryu/contrib/tinyrpc/__init__.py new file mode 100644 index 0000000..f24deb2 --- /dev/null +++ b/ryu/contrib/tinyrpc/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .protocols import * +from .exc import * +from .client import * diff --git a/ryu/contrib/tinyrpc/client.py b/ryu/contrib/tinyrpc/client.py new file mode 100644 index 0000000..0d77547 --- /dev/null +++ b/ryu/contrib/tinyrpc/client.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .exc import RPCError + + +class RPCClient(object): + """Client for making RPC calls to connected servers. + + :param protocol: An :py:class:`~tinyrpc.RPCProtocol` instance. + :param transport: A :py:class:`~tinyrpc.transports.ClientTransport` + instance. + """ + + def __init__(self, protocol, transport): + self.protocol = protocol + self.transport = transport + + def _send_and_handle_reply(self, req): + # sends and waits for reply + reply = self.transport.send_message(req.serialize()) + + response = self.protocol.parse_reply(reply) + + if hasattr(response, 'error'): + raise RPCError('Error calling remote procedure: %s' %\ + response.error) + + return response + + def call(self, method, args, kwargs, one_way=False): + """Calls the requested method and returns the result. + + If an error occured, an :py:class:`~tinyrpc.exc.RPCError` instance + is raised. + + :param method: Name of the method to call. + :param args: Arguments to pass to the method. + :param kwargs: Keyword arguments to pass to the method. + :param one_way: Whether or not a reply is desired. + """ + req = self.protocol.create_request(method, args, kwargs, one_way) + + return self._send_and_handle_reply(req).result + + def get_proxy(self, prefix='', one_way=False): + """Convenience method for creating a proxy. + + :param prefix: Passed on to :py:class:`~tinyrpc.client.RPCProxy`. + :param one_way: Passed on to :py:class:`~tinyrpc.client.RPCProxy`. + :return: :py:class:`~tinyrpc.client.RPCProxy` instance.""" + return RPCProxy(self, prefix, one_way) + + def batch_call(self, calls): + """Experimental, use at your own peril.""" + req = self.protocol.create_batch_request() + + for call_args in calls: + req.append(self.protocol.create_request(*call_args)) + + return self._send_and_handle_reply(req) + + +class RPCProxy(object): + """Create a new remote proxy object. + + Proxies allow calling of methods through a simpler interface. See the + documentation for an example. + + :param client: An :py:class:`~tinyrpc.client.RPCClient` instance. + :param prefix: Prefix to prepend to every method name. + :param one_way: Passed to every call of + :py:func:`~tinyrpc.client.call`. + """ + + def __init__(self, client, prefix='', one_way=False): + self.client = client + self.prefix = prefix + self.one_way = one_way + + def __getattr__(self, name): + """Returns a proxy function that, when called, will call a function + name ``name`` on the client associated with the proxy. + """ + proxy_func = lambda *args, **kwargs: self.client.call( + self.prefix + name, + args, + kwargs, + one_way=self.one_way + ) + return proxy_func diff --git a/ryu/contrib/tinyrpc/dispatch/__init__.py b/ryu/contrib/tinyrpc/dispatch/__init__.py new file mode 100644 index 0000000..ec722e4 --- /dev/null +++ b/ryu/contrib/tinyrpc/dispatch/__init__.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import inspect + +from ..exc import * + + +def public(name=None): + """Set RPC name on function. + + This function decorator will set the ``_rpc_public_name`` attribute on a + function, causing it to be picked up if an instance of its parent class is + registered using + :py:func:`~tinyrpc.dispatch.RPCDispatcher.register_instance`. + + ``@public`` is a shortcut for ``@public()``. + + :param name: The name to register the function with. + """ + # called directly with function + if callable(name): + f = name + f._rpc_public_name = f.__name__ + return f + + def _(f): + f._rpc_public_name = name or f.__name__ + return f + + return _ + + +class RPCDispatcher(object): + """Stores name-to-method mappings.""" + + def __init__(self): + self.method_map = {} + self.subdispatchers = {} + + def add_subdispatch(self, dispatcher, prefix=''): + """Adds a subdispatcher, possibly in its own namespace. + + :param dispatcher: The dispatcher to add as a subdispatcher. + :param prefix: A prefix. All of the new subdispatchers methods will be + available as prefix + their original name. + """ + self.subdispatchers.setdefault(prefix, []).append(dispatcher) + + def add_method(self, f, name=None): + """Add a method to the dispatcher. + + :param f: Callable to be added. + :param name: Name to register it with. If ``None``, ``f.__name__`` will + be used. + """ + assert callable(f), "method argument must be callable" + # catches a few programming errors that are + # commonly silently swallowed otherwise + if not name: + name = f.__name__ + + if name in self.method_map: + raise RPCError('Name %s already registered') + + self.method_map[name] = f + + def dispatch(self, request): + """Fully handle request. + + The dispatch method determines which method to call, calls it and + returns a response containing a result. + + No exceptions will be thrown, rather, every exception will be turned + into a response using :py:func:`~tinyrpc.RPCRequest.error_respond`. + + If a method isn't found, a :py:exc:`~tinyrpc.exc.MethodNotFoundError` + response will be returned. If any error occurs outside of the requested + method, a :py:exc:`~tinyrpc.exc.ServerError` without any error + information will be returend. + + If the method is found and called but throws an exception, the + exception thrown is used as a response instead. This is the only case + in which information from the exception is possibly propagated back to + the client, as the exception is part of the requested method. + + :py:class:`~tinyrpc.RPCBatchRequest` instances are handled by handling + all its children in order and collecting the results, then returning an + :py:class:`~tinyrpc.RPCBatchResponse` with the results. + + :param request: An :py:func:`~tinyrpc.RPCRequest`. + :return: An :py:func:`~tinyrpc.RPCResponse`. + """ + if hasattr(request, 'create_batch_response'): + results = [self._dispatch(req) for req in request] + + response = request.create_batch_response() + if response != None: + response.extend(results) + + return response + else: + return self._dispatch(request) + + def _dispatch(self, request): + try: + try: + method = self.get_method(request.method) + except KeyError as e: + return request.error_respond(MethodNotFoundError(e)) + + # we found the method + try: + result = method(*request.args, **request.kwargs) + except Exception as e: + # an error occured within the method, return it + return request.error_respond(e) + + # respond with result + return request.respond(result) + except Exception as e: + # unexpected error, do not let client know what happened + return request.error_respond(ServerError()) + + def get_method(self, name): + """Retrieve a previously registered method. + + Checks if a method matching ``name`` has been registered. + + If :py:func:`get_method` cannot find a method, every subdispatcher + with a prefix matching the method name is checked as well. + + If a method isn't found, a :py:class:`KeyError` is thrown. + + :param name: Callable to find. + :param return: The callable. + """ + if name in self.method_map: + return self.method_map[name] + + for prefix, subdispatchers in self.subdispatchers.iteritems(): + if name.startswith(prefix): + for sd in subdispatchers: + try: + return sd.get_method(name[len(prefix):]) + except KeyError: + pass + + raise KeyError(name) + + def public(self, name=None): + """Convenient decorator. + + Allows easy registering of functions to this dispatcher. Example: + + .. code-block:: python + + dispatch = RPCDispatcher() + + @dispatch.public + def foo(bar): + # ... + + class Baz(object): + def not_exposed(self): + # ... + + @dispatch.public(name='do_something') + def visible_method(arg1) + # ... + + :param name: Name to register callable with + """ + if callable(name): + self.add_method(name) + return name + + def _(f): + self.add_method(f, name=name) + return f + + return _ + + def register_instance(self, obj, prefix=''): + """Create new subdispatcher and register all public object methods on + it. + + To be used in conjunction with the :py:func:`tinyrpc.dispatch.public` + decorator (*not* :py:func:`tinyrpc.dispatch.RPCDispatcher.public`). + + :param obj: The object whose public methods should be made available. + :param prefix: A prefix for the new subdispatcher. + """ + dispatch = self.__class__() + for name, f in inspect.getmembers( + obj, lambda f: callable(f) and hasattr(f, '_rpc_public_name') + ): + dispatch.add_method(f, f._rpc_public_name) + + # add to dispatchers + self.add_subdispatch(dispatch, prefix) diff --git a/ryu/contrib/tinyrpc/exc.py b/ryu/contrib/tinyrpc/exc.py new file mode 100644 index 0000000..0c57284 --- /dev/null +++ b/ryu/contrib/tinyrpc/exc.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +class RPCError(Exception): + """Base class for all excetions thrown by :py:mod:`tinyrpc`.""" + + +class BadRequestError(RPCError): + """Base class for all errors that caused the processing of a request to + abort before a request object could be instantiated.""" + + def error_respond(self): + """Create :py:class:`~tinyrpc.RPCErrorResponse` to respond the error. + + :return: A error responce instance or ``None``, if the protocol decides + to drop the error silently.""" + raise RuntimeError('Not implemented') + + +class BadReplyError(RPCError): + """Base class for all errors that caused processing of a reply to abort + before it could be turned in a response object.""" + + +class InvalidRequestError(BadRequestError): + """A request made was malformed (i.e. violated the specification) and could + not be parsed.""" + + +class InvalidReplyError(BadReplyError): + """A reply received was malformed (i.e. violated the specification) and + could not be parsed into a response.""" + + +class MethodNotFoundError(RPCError): + """The desired method was not found.""" + + +class ServerError(RPCError): + """An internal error in the RPC system occured.""" diff --git a/ryu/contrib/tinyrpc/protocols/__init__.py b/ryu/contrib/tinyrpc/protocols/__init__.py new file mode 100644 index 0000000..9ad55b9 --- /dev/null +++ b/ryu/contrib/tinyrpc/protocols/__init__.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python + +from ..exc import * + +class RPCRequest(object): + unique_id = None + """A unique ID to remember the request by. Protocol specific, may or + may not be set. This value should only be set by + :py:func:`~tinyrpc.RPCProtocol.create_request`. + + The ID allows client to receive responses out-of-order and still allocate + them to the correct request. + + Only supported if the parent protocol has + :py:attr:`~tinyrpc.RPCProtocol.supports_out_of_order` set to ``True``. + """ + + method = None + """The name of the method to be called.""" + + args = [] + """The positional arguments of the method call.""" + + kwargs = {} + """The keyword arguments of the method call.""" + + def error_respond(self, error): + """Creates an error response. + + Create a response indicating that the request was parsed correctly, + but an error has occured trying to fulfill it. + + :param error: An exception or a string describing the error. + + :return: A response or ``None`` to indicate that no error should be sent + out. + """ + raise NotImplementedError() + + def respond(self, result): + """Create a response. + + Call this to return the result of a successful method invocation. + + This creates and returns an instance of a protocol-specific subclass of + :py:class:`~tinyrpc.RPCResponse`. + + :param result: Passed on to new response instance. + + :return: A response or ``None`` to indicate this request does not expect a + response. + """ + raise NotImplementedError() + + def serialize(self): + """Returns a serialization of the request. + + :return: A string to be passed on to a transport. + """ + raise NotImplementedError() + + +class RPCBatchRequest(list): + """Multiple requests batched together. + + A batch request is a subclass of :py:class:`list`. Protocols that support + multiple requests in a single message use this to group them together. + + Handling a batch requests is done in any order, responses must be gathered + in a batch response and be in the same order as their respective requests. + + Any item of a batch request is either a request or a subclass of + :py:class:`~tinyrpc.BadRequestError`, which indicates that there has been + an error in parsing the request. + """ + + def create_batch_response(self): + """Creates a response suitable for responding to this request. + + :return: An :py:class:`~tinyrpc.RPCBatchResponse` or ``None``, if no + response is expected.""" + raise NotImplementedError() + + def serialize(self): + raise NotImplementedError() + + +class RPCResponse(object): + """RPC call response class. + + Base class for all deriving responses. + + Has an attribute ``result`` containing the result of the RPC call, unless + an error occured, in which case an attribute ``error`` will contain the + error message.""" + + unique_id = None + + def serialize(self): + """Returns a serialization of the response. + + :return: A reply to be passed on to a transport. + """ + raise NotImplementedError() + + +class RPCErrorResponse(RPCResponse): + pass + + +class RPCBatchResponse(list): + """Multiple response from a batch request. See + :py:class:`~tinyrpc.RPCBatchRequest` on how to handle. + + Items in a batch response need to be + :py:class:`~tinyrpc.RPCResponse` instances or None, meaning no reply should + generated for the request. + """ + + def serialize(self): + """Returns a serialization of the batch response.""" + raise NotImplementedError() + + +class RPCProtocol(object): + """Base class for all protocol implementations.""" + + supports_out_of_order = False + """If true, this protocol can receive responses out of order correctly. + + Note that this usually depends on the generation of unique_ids, the + generation of these may or may not be thread safe, depending on the + protocol. Ideally, only once instance of RPCProtocol should be used per + client.""" + + def create_request(self, method, args=None, kwargs=None, one_way=False): + """Creates a new RPCRequest object. + + It is up to the implementing protocol whether or not ``args``, + ``kwargs``, one of these, both at once or none of them are supported. + + :param method: The method name to invoke. + :param args: The positional arguments to call the method with. + :param kwargs: The keyword arguments to call the method with. + :param one_way: The request is an update, i.e. it does not expect a + reply. + :return: A new :py:class:`~tinyrpc.RPCRequest` instance. + """ + raise NotImplementedError() + + def parse_request(self, data): + """Parses a request given as a string and returns an + :py:class:`RPCRequest` instance. + + :return: An instanced request. + """ + raise NotImplementedError() + + def parse_reply(self, data): + """Parses a reply and returns an :py:class:`RPCResponse` instance. + + :return: An instanced response. + """ + raise NotImplementedError() + + +class RPCBatchProtocol(RPCProtocol): + def create_batch_request(self, requests=None): + """Create a new :py:class:`tinyrpc.RPCBatchRequest` object. + + :param requests: A list of requests. + """ + raise NotImplementedError() diff --git a/ryu/contrib/tinyrpc/protocols/jsonrpc.py b/ryu/contrib/tinyrpc/protocols/jsonrpc.py new file mode 100644 index 0000000..941da51 --- /dev/null +++ b/ryu/contrib/tinyrpc/protocols/jsonrpc.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .. import RPCBatchProtocol, RPCRequest, RPCResponse, RPCErrorResponse,\ + InvalidRequestError, MethodNotFoundError, ServerError,\ + InvalidReplyError, RPCError, RPCBatchRequest, RPCBatchResponse + +import json + +class FixedErrorMessageMixin(object): + def __init__(self, *args, **kwargs): + if not args: + args = [self.message] + super(FixedErrorMessageMixin, self).__init__(*args, **kwargs) + + def error_respond(self): + response = JSONRPCErrorResponse() + + response.error = self.message + response.unique_id = None + response._jsonrpc_error_code = self.jsonrpc_error_code + return response + + + +class JSONRPCParseError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32700 + message = 'Parse error' + + +class JSONRPCInvalidRequestError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32600 + message = 'Invalid Request' + + +class JSONRPCMethodNotFoundError(FixedErrorMessageMixin, MethodNotFoundError): + jsonrpc_error_code = -32601 + message = 'Method not found' + + +class JSONRPCInvalidParamsError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32602 + message = 'Invalid params' + + +class JSONRPCInternalError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32603 + message = 'Internal error' + + +class JSONRPCServerError(FixedErrorMessageMixin, InvalidRequestError): + jsonrpc_error_code = -32000 + message = '' + + +class JSONRPCSuccessResponse(RPCResponse): + def _to_dict(self): + return { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'id': self.unique_id, + 'result': self.result, + } + + def serialize(self): + return json.dumps(self._to_dict()) + + +class JSONRPCErrorResponse(RPCErrorResponse): + def _to_dict(self): + return { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'id': self.unique_id, + 'error': { + 'message': str(self.error), + 'code': self._jsonrpc_error_code, + } + } + + def serialize(self): + return json.dumps(self._to_dict()) + + +def _get_code_and_message(error): + assert isinstance(error, (Exception, basestring)) + if isinstance(error, Exception): + if hasattr(error, 'jsonrpc_error_code'): + code = error.jsonrpc_error_code + msg = str(error) + elif isinstance(error, InvalidRequestError): + code = JSONRPCInvalidRequestError.jsonrpc_error_code + msg = JSONRPCInvalidRequestError.message + elif isinstance(error, MethodNotFoundError): + code = JSONRPCMethodNotFoundError.jsonrpc_error_code + msg = JSONRPCMethodNotFoundError.message + else: + # allow exception message to propagate + code = JSONRPCServerError.jsonrpc_error_code + msg = str(error) + else: + code = -32000 + msg = error + + return code, msg + + +class JSONRPCRequest(RPCRequest): + def error_respond(self, error): + if not self.unique_id: + return None + + response = JSONRPCErrorResponse() + + code, msg = _get_code_and_message(error) + + response.error = msg + response.unique_id = self.unique_id + response._jsonrpc_error_code = code + return response + + def respond(self, result): + response = JSONRPCSuccessResponse() + + if not self.unique_id: + return None + + response.result = result + response.unique_id = self.unique_id + + return response + + def _to_dict(self): + jdata = { + 'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION, + 'method': self.method, + } + if self.args: + jdata['params'] = self.args + if self.kwargs: + jdata['params'] = self.kwargs + if self.unique_id != None: + jdata['id'] = self.unique_id + return jdata + + def serialize(self): + return json.dumps(self._to_dict()) + + +class JSONRPCBatchRequest(RPCBatchRequest): + def create_batch_response(self): + if self._expects_response(): + return JSONRPCBatchResponse() + + def _expects_response(self): + for request in self: + if isinstance(request, Exception): + return True + if request.unique_id != None: + return True + + return False + + def serialize(self): + return json.dumps([req._to_dict() for req in self]) + + +class JSONRPCBatchResponse(RPCBatchResponse): + def serialize(self): + return json.dumps([resp._to_dict() for resp in self if resp != None]) + + +class JSONRPCProtocol(RPCBatchProtocol): + """JSONRPC protocol implementation. + + Currently, only version 2.0 is supported.""" + + JSON_RPC_VERSION = "2.0" + _ALLOWED_REPLY_KEYS = sorted(['id', 'jsonrpc', 'error', 'result']) + _ALLOWED_REQUEST_KEYS = sorted(['id', 'jsonrpc', 'method', 'params']) + + def __init__(self, *args, **kwargs): + super(JSONRPCProtocol, self).__init__(*args, **kwargs) + self._id_counter = 0 + + def _get_unique_id(self): + self._id_counter += 1 + return self._id_counter + + def create_batch_request(self, requests=None): + return JSONRPCBatchRequest(requests or []) + + def create_request(self, method, args=None, kwargs=None, one_way=False): + if args and kwargs: + raise InvalidRequestError('Does not support args and kwargs at '\ + 'the same time') + + request = JSONRPCRequest() + + if not one_way: + request.unique_id = self._get_unique_id() + + request.method = method + request.args = args + request.kwargs = kwargs + + return request + + def parse_reply(self, data): + try: + rep = json.loads(data) + except Exception as e: + raise InvalidReplyError(e) + + for k in rep.iterkeys(): + if not k in self._ALLOWED_REPLY_KEYS: + raise InvalidReplyError('Key not allowed: %s' % k) + + if not 'jsonrpc' in rep: + raise InvalidReplyError('Missing jsonrpc (version) in response.') + + if rep['jsonrpc'] != self.JSON_RPC_VERSION: + raise InvalidReplyError('Wrong JSONRPC version') + + if not 'id' in rep: + raise InvalidReplyError('Missing id in response') + + if ('error' in rep) == ('result' in rep): + raise InvalidReplyError( + 'Reply must contain exactly one of result and error.' + ) + + if 'error' in rep: + response = JSONRPCErrorResponse() + error = rep['error'] + response.error = error['message'] + response._jsonrpc_error_code = error['code'] + else: + response = JSONRPCSuccessResponse() + response.result = rep.get('result', None) + + response.unique_id = rep['id'] + + return response + + def parse_request(self, data): + try: + req = json.loads(data) + except Exception as e: + raise JSONRPCParseError() + + if isinstance(req, list): + # batch request + requests = JSONRPCBatchRequest() + for subreq in req: + try: + requests.append(self._parse_subrequest(subreq)) + except RPCError as e: + requests.append(e) + except Exception as e: + requests.append(JSONRPCInvalidRequestError()) + + if not requests: + raise JSONRPCInvalidRequestError() + return requests + else: + return self._parse_subrequest(req) + + def _parse_subrequest(self, req): + for k in req.iterkeys(): + if not k in self._ALLOWED_REQUEST_KEYS: + raise JSONRPCInvalidRequestError() + + if req.get('jsonrpc', None) != self.JSON_RPC_VERSION: + raise JSONRPCInvalidRequestError() + + if not isinstance(req['method'], basestring): + raise JSONRPCInvalidRequestError() + + request = JSONRPCRequest() + request.method = str(req['method']) + request.unique_id = req.get('id', None) + + params = req.get('params', None) + if params != None: + if isinstance(params, list): + request.args = req['params'] + elif isinstance(params, dict): + request.kwargs = req['params'] + else: + raise JSONRPCInvalidParamsError() + + return request diff --git a/ryu/contrib/tinyrpc/server/__init__.py b/ryu/contrib/tinyrpc/server/__init__.py new file mode 100644 index 0000000..6b2cc1a --- /dev/null +++ b/ryu/contrib/tinyrpc/server/__init__.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# FIXME: needs unittests +# FIXME: needs checks for out-of-order, concurrency, etc as attributes +from tinyrpc.exc import RPCError + +class RPCServer(object): + """High level RPC server. + + :param transport: The :py:class:`~tinyrpc.transports.RPCTransport` to use. + :param protocol: The :py:class:`~tinyrpc.RPCProtocol` to use. + :param dispatcher: The :py:class:`~tinyrpc.dispatch.RPCDispatcher` to use. + """ + def __init__(self, transport, protocol, dispatcher): + self.transport = transport + self.protocol = protocol + self.dispatcher = dispatcher + + def serve_forever(self): + """Handle requests forever. + + Starts the server loop in which the transport will be polled for a new + message. + + After a new message has arrived, + :py:func:`~tinyrpc.server.RPCServer._spawn` is called with a handler + function and arguments to handle the request. + + The handler function will try to decode the message using the supplied + protocol, if that fails, an error response will be sent. After decoding + the message, the dispatcher will be asked to handle the resultung + request and the return value (either an error or a result) will be sent + back to the client using the transport. + + After calling :py:func:`~tinyrpc.server.RPCServer._spawn`, the server + will fetch the next message and repeat. + """ + while True: + context, message = self.transport.receive_message() + + # assuming protocol is threadsafe and dispatcher is theadsafe, as + # long as its immutable + + def handle_message(context, message): + try: + request = self.protocol.parse_request(message) + except RPCError as e: + response = e.error_respond() + else: + response = self.dispatcher.dispatch(request) + + # send reply + self.transport.send_reply(context, response.serialize()) + + self._spawn(handle_message, context, message) + + def _spawn(self, func, *args, **kwargs): + """Spawn a handler function. + + This function is overridden in subclasses to provide concurrency. + + In the base implementation, it simply calls the supplied function + ``func`` with ``*args`` and ``**kwargs``. This results in a + single-threaded, single-process, synchronous server. + + :param func: A callable to call. + :param args: Arguments to ``func``. + :param kwargs: Keyword arguments to ``func``. + """ + func(*args, **kwargs) diff --git a/ryu/contrib/tinyrpc/server/gevent.py b/ryu/contrib/tinyrpc/server/gevent.py new file mode 100644 index 0000000..c1078fc --- /dev/null +++ b/ryu/contrib/tinyrpc/server/gevent.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import +import gevent + +from . import RPCServer + + +class RPCServerGreenlets(RPCServer): + # documentation in docs because of dependencies + def _spawn(self, func, *args, **kwargs): + gevent.spawn(func, *args, **kwargs) diff --git a/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py new file mode 100644 index 0000000..a574d97 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/INTEGRATE_ME.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import gevent +import zmq.green as zmq +from logbook import Logger + +from tinyrpc.protocols.jsonrpc import JSONRPCProtocol +from tinyrpc.dispatch import RPCDispatcher +from tinyrpc import RPCError, ServerError, MethodNotFoundError + + +class Server(object): + def __init__(transport, protocol, dispatcher): + self.transport = transport + self.protocol = protocol + self.dispatcher = dispatcher + + def run(self): + while True: + try: + context, message = self.transport.receive_message() + except Exception as e: + self.exception(e) + continue + + # assuming protocol is threadsafe and dispatcher is theadsafe, as long + # as its immutable + + self.handle_client(context, message) + + def handle_client(self, context, message): + try: + request = self.protocol.parse_request(message) + except RPCError as e: + self.exception(e) + response = e.error_respond() + else: + response = dispatcher.dispatch(request) + + # send reply + reply = response.serialize() + self.transport.send_reply(context, reply) + + +class ConcurrentServerMixin(object): + def handle_client(self, context, message): + self.spawn( + super(ConcurrentServer, self).handle_client, context, message + ) + + +class ZmqRouterTransport(object): + def __init__(self, socket): + self.socket = socket + + def receive_message(self): + msg = socket.recv_multipart() + return msg[:-1], [-1] + + def send_reply(self, context, reply): + self.send_multipart(context + [reply]) + + +class GeventConcurrencyMixin(ConcurrentServerMixin): + def spawn(self, func, *args, **kwargs): + gevent.spawn(func, *args, **kwargs) + + +def rpc_server(socket, protocol, dispatcher): + log = Logger('rpc_server') + log.debug('starting up...') + while True: + try: + message = socket.recv_multipart() + except Exception as e: + log.warning('Failed to receive message from client, ignoring...') + log.exception(e) + continue + + log.debug('Received message %s from %r' % (message[-1], message[0])) + + # assuming protocol is threadsafe and dispatcher is theadsafe, as long + # as its immutable + + def handle_client(message): + try: + request = protocol.parse_request(message[-1]) + except RPCError as e: + log.exception(e) + response = e.error_respond() + else: + response = dispatcher.dispatch(request) + log.debug('Response okay: %r' % response) + + # send reply + message[-1] = response.serialize() + log.debug('Replying %s to %r' % (message[-1], message[0])) + socket.send_multipart(message) + + gevent.spawn(handle_client, message) + + +context = zmq.Context() +socket = context.socket(zmq.ROUTER) +socket.bind("tcp://127.0.0.1:12345") + +dispatcher = RPCDispatcher() + [email protected] +def throw_up(): + return 'asad' + raise Exception('BLARGH') + +rpc_server(socket, JSONRPCProtocol(), dispatcher) diff --git a/ryu/contrib/tinyrpc/transports/__init__.py b/ryu/contrib/tinyrpc/transports/__init__.py new file mode 100644 index 0000000..3bbc872 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/__init__.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +class ServerTransport(object): + """Base class for all server transports.""" + + def receive_message(self): + """Receive a message from the transport. + + Blocks until another message has been received. May return a context + opaque to clients that should be passed on + :py:func:`~tinyrpc.transport.Transport.send_reply` to identify the + client later on. + + :return: A tuple consisting of ``(context, message)``. + """ + raise NotImplementedError() + + def send_reply(self, context, reply): + """Sends a reply to a client. + + The client is usually identified by passing ``context`` as returned + from the original + :py:func:`~tinyrpc.transport.Transport.receive_message` call. + + Messages must be strings, it is up to the sender to convert the + beforehand. A non-string value raises a :py:exc:`TypeError`. + + :param context: A context returned by + :py:func:`~tinyrpc.transport.Transport.receive_message`. + :param reply: A string to send back as the reply. + """ + raise NotImplementedError + + +class ClientTransport(object): + """Base class for all client transports.""" + + def send_message(self, message, expect_reply=True): + """Send a message to the server and possibly receive a reply. + + Sends a message to the connected server. + + Messages must be strings, it is up to the sender to convert the + beforehand. A non-string value raises a :py:exc:`TypeError`. + + This function will block until one reply has been received. + + :param message: A string to send. + :return: A string containing the server reply. + """ + raise NotImplementedError diff --git a/ryu/contrib/tinyrpc/transports/http.py b/ryu/contrib/tinyrpc/transports/http.py new file mode 100644 index 0000000..919f97f --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/http.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from Queue import Queue +import threading +import requests + +from . import ServerTransport, ClientTransport + + +class HttpPostClientTransport(ClientTransport): + """HTTP POST based client transport. + + Requires :py:mod:`requests`. Submits messages to a server using the body of + an ``HTTP`` ``POST`` request. Replies are taken from the responses body. + + :param endpoint: The URL to send ``POST`` data to. + :param kwargs: Additional parameters for :py:func:`requests.post`. + """ + def __init__(self, endpoint, **kwargs): + self.endpoint = endpoint + self.request_kwargs = kwargs + + def send_message(self, message, expect_reply=True): + if not isinstance(message, str): + raise TypeError('str expected') + + r = requests.post(self.endpoint, data=message, **self.request_kwargs) + + if expect_reply: + return r.content diff --git a/ryu/contrib/tinyrpc/transports/tcp.py b/ryu/contrib/tinyrpc/transports/tcp.py new file mode 100644 index 0000000..c5ac614 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/tcp.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from Queue import Queue +import struct +import threading + +from SocketServer import TCPServer, BaseRequestHandler, ThreadingMixIn + +from . import RPCRequestResponseServer + + +def _read_length_prefixed_msg(sock, prefix_format='!I'): + prefix_bytes = struct.calcsize(prefix_format) + + sock.recv(prefix_bytes) + +def _read_n_bytes(sock, n): + buf = [] + while n > 0: + data = sock.recv(n) + n -= len(data) + buf.append(data) + + return ''.join(buf) + + +def create_length_prefixed_tcp_handler(): + queue = Queue() + class LengthPrefixedTcpHandler(BaseRequestHandler): + def handle(self): + #msg = _read_length_prefixed_msg(self.request) + # this will run inside a new thread + self.request.send("hello\n") + while True: + b = _read_n_bytes(self.request, 10) + self.request.send("you sent: %s" % b) + queue.put(b) + + return queue, LengthPrefixedTcpHandler + + +def tcp_test_main(): + class Server(ThreadingMixIn, TCPServer): + pass + + queue, Handler = create_length_prefixed_tcp_handler() + + server = Server(('localhost', 12345), Handler) + server.allow_reuse_address = True + + server.serve_forever() diff --git a/ryu/contrib/tinyrpc/transports/wsgi.py b/ryu/contrib/tinyrpc/transports/wsgi.py new file mode 100644 index 0000000..f9a84c1 --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/wsgi.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import Queue + +from werkzeug.wrappers import Response, Request + +from . import ServerTransport + + +class WsgiServerTransport(ServerTransport): + """WSGI transport. + + Requires :py:mod:`werkzeug`. + + Due to the nature of WSGI, this transport has a few pecularities: It must + be run in a thread, greenlet or some other form of concurrent execution + primitive. + + This is due to + :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.handle` blocking + while waiting for a call to + :py:func:`~tinyrpc.transports.wsgi.WsgiServerTransport.send_reply`. + + The parameter ``queue_class`` must be used to supply a proper queue class + for the chosen concurrency mechanism (i.e. when using :py:mod:`gevent`, + set it to :py:class:`gevent.queue.Queue`). + + :param max_content_length: The maximum request content size allowed. Should + be set to a sane value to prevent DoS-Attacks. + :param queue_class: The Queue class to use. + :param allow_origin: The ``Access-Control-Allow-Origin`` header. Defaults + to ``*`` (so change it if you need actual security). + """ + def __init__(self, max_content_length=4096, queue_class=Queue.Queue, + allow_origin='*'): + self._queue_class = queue_class + self.messages = queue_class() + self.max_content_length = max_content_length + self.allow_origin = allow_origin + + def receive_message(self): + return self.messages.get() + + def send_reply(self, context, reply): + if not isinstance(reply, str): + raise TypeError('str expected') + + context.put(reply) + + def handle(self, environ, start_response): + """WSGI handler function. + + The transport will serve a request by reading the message and putting + it into an internal buffer. It will then block until another + concurrently running function sends a reply using + :py:func:`~tinyrpc.transports.WsgiServerTransport.send_reply`. + + The reply will then be sent to the client being handled and handle will + return. + """ + request = Request(environ) + request.max_content_length = self.max_content_length + + access_control_headers = { + 'Access-Control-Allow-Methods': 'POST', + 'Access-Control-Allow-Origin': self.allow_origin, + 'Access-Control-Allow-Headers': \ + 'Content-Type, X-Requested-With, Accept, Origin' + } + + if request.method == 'OPTIONS': + response = Response(headers=access_control_headers) + + elif request.method == 'POST': + # message is encoded in POST, read it... + msg = request.stream.read() + + # create new context + context = self._queue_class() + + self.messages.put((context, msg)) + + # ...and send the reply + response = Response(context.get(), headers=access_control_headers) + else: + # nothing else supported at the moment + response = Response('Only POST supported', 405) + + return response(environ, start_response) diff --git a/ryu/contrib/tinyrpc/transports/zmq.py b/ryu/contrib/tinyrpc/transports/zmq.py new file mode 100644 index 0000000..502a1dd --- /dev/null +++ b/ryu/contrib/tinyrpc/transports/zmq.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import absolute_import # needed for zmq import +import zmq + +from . import ServerTransport, ClientTransport + + +class ZmqServerTransport(ServerTransport): + """Server transport based on a :py:const:`zmq.ROUTER` socket. + + :param socket: A :py:const:`zmq.ROUTER` socket instance, bound to an + endpoint. + """ + + def __init__(self, socket): + self.socket = socket + + def receive_message(self): + msg = self.socket.recv_multipart() + return msg[:-1], msg[-1] + + def send_reply(self, context, reply): + self.socket.send_multipart(context + [reply]) + + @classmethod + def create(cls, zmq_context, endpoint): + """Create new server transport. + + Instead of creating the socket yourself, you can call this function and + merely pass the :py:class:`zmq.core.context.Context` instance. + + By passing a context imported from :py:mod:`zmq.green`, you can use + green (gevent) 0mq sockets as well. + + :param zmq_context: A 0mq context. + :param endpoint: The endpoint clients will connect to. + """ + socket = zmq_context.socket(zmq.ROUTER) + socket.bind(endpoint) + return cls(socket) + + +class ZmqClientTransport(ClientTransport): + """Client transport based on a :py:const:`zmq.REQ` socket. + + :param socket: A :py:const:`zmq.REQ` socket instance, connected to the + server socket. + """ + + def __init__(self, socket): + self.socket = socket + + def send_message(self, message, expect_reply=True): + self.socket.send(message) + + if expect_reply: + return self.socket.recv() + + @classmethod + def create(cls, zmq_context, endpoint): + """Create new client transport. + + Instead of creating the socket yourself, you can call this function and + merely pass the :py:class:`zmq.core.context.Context` instance. + + By passing a context imported from :py:mod:`zmq.green`, you can use + green (gevent) 0mq sockets as well. + + :param zmq_context: A 0mq context. + :param endpoint: The endpoint the server is bound to. + """ + socket = zmq_context.socket(zmq.REQ) + socket.connect(endpoint) + return cls(socket) diff --git a/tools/pip-requires b/tools/pip-requires index 4ed8bc7..2528ebc 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -7,4 +7,3 @@ paramiko routes six>=1.4.0 webob>=1.0.8 -tinyrpc -- 1.9.1 ------------------------------------------------------------------------------ HPCC Systems Open Source Big Data Platform from LexisNexis Risk Solutions Find What Matters Most in Your Big Data with HPCC Systems Open Source. Fast. Scalable. Simple. Ideal for Dirty Data. Leverages Graph Analysis for Fast Processing & Easy Data Exploration http://p.sf.net/sfu/hpccsystems _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
