On Wed, May 08, 2013 at 03:32:36PM +0900, YAMAMOTO Takashi wrote: > - make create_request returns msgid > - fix msgid wrap around > - rename classes > - convenient transport classes for socket-like > - update requirements > - copyright notice, comments, and assertions > > Signed-off-by: YAMAMOTO Takashi <yamam...@valinux.co.jp> > --- > ryu/lib/rpc.py | 277 > ++++++++++++++++++++++++++++++++++++++++++++++++++--- > setup.cfg | 2 +- > tools/pip-requires | 1 + > 3 files changed, 264 insertions(+), 16 deletions(-) > > diff --git a/ryu/lib/rpc.py b/ryu/lib/rpc.py > index 8455791..06cd0c3 100644 > --- a/ryu/lib/rpc.py > +++ b/ryu/lib/rpc.py > @@ -1,37 +1,284 @@ > +#!/usr/bin/env python > +# > +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. > +# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp> > +# > +# Licensed under the Apache License, Version 2.0 (the "License"); > +# you may not use this file except in compliance with the License. > +# You may obtain a copy of the License at > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > +# implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > + > +# msgpack-rpc > +# http://wiki.msgpack.org/display/MSGPACK/RPC+specification > + > import msgpack > > > -class RpcMessage(object): > +class MessageType(object): > REQUEST = 0 > RESPONSE = 1 > NOTIFY = 2 > > > -class RpcSession(object): > +class MessageEncoder(object): > + """msgpack-rpc encoder/decoder. > + intended to be transport-agnostic. > + """ > def __init__(self): > - super(RpcSession, self).__init__() > - self._packer = msgpack.Packer() > - self._unpacker = msgpack.Unpacker() > + super(MessageEncoder, self).__init__() > + # note: on-wire msgpack has no notion of encoding. > + # the msgpack-python library implicitly converts unicode to > + # utf-8 encoded bytes by default. we don't want to rely on > + # the behaviour though because it seems to be going to change. > + # cf. https://gist.github.com/methane/5022403 > + self._packer = msgpack.Packer(encoding=None) > + self._unpacker = msgpack.Unpacker(encoding=None) > self._next_msgid = 0 > > def _create_msgid(self): > this_id = self._next_msgid > - self._next_msgid += 1 > + self._next_msgid = (self._next_msgid + 1) % 0xffffffff > return this_id > > def create_request(self, method, params): > + assert isinstance(method, str) > + assert isinstance(params, list) > msgid = self._create_msgid() > - return self._packer.pack([RpcMessage.REQUEST, msgid, method, params]) > + return (self._packer.pack([MessageType.REQUEST, msgid, method, > + params]), msgid) > > - def create_response(self, msgid, error, result): > - return self._packer.pack([RpcMessage.RESPONSE, msgid, error, result]) > + def create_response(self, msgid, error=None, result=None): > + assert isinstance(msgid, int) > + assert 0 <= msgid and msgid <= 0xffffffff > + assert error is None or result is None > + return self._packer.pack([MessageType.RESPONSE, msgid, error, > result]) > > def create_notification(self, method, params): > - return self._packer.pack([RpcMessage.NOTIFY, method, params]) > + assert isinstance(method, str) > + assert isinstance(params, list) > + return self._packer.pack([MessageType.NOTIFY, method, params]) > > - def get_messages(self, data): > + def get_and_dispatch_messages(self, data, disp_table): > + """dissect messages from a raw stream data. > + disp_table[type] should be a callable for the corresponding > + MessageType. > + """ > self._unpacker.feed(data) > - messages = [] > - for msg in self._unpacker: > - messages.append(msg) > - return messages > + for m in self._unpacker: > + self._dispatch_message(m, disp_table) > + > + def _dispatch_message(self, m, disp_table): > + # XXX validation > + type = m[0] > + try: > + f = disp_table[type] > + except KeyError: > + # ignore messages with unknown type > + return > + f(m[1:]) > + > + > +from collections import deque > + > + > +class EndPoint(object): > + """An endpoint > + *sock* is a socket-like. it can be either blocking or non-blocking. > + """ > + def __init__(self, sock, encoder=None, disp_table=None): > + if encoder is None: > + encoder = MessageEncoder() > + self._encoder = encoder > + self._sock = sock > + if disp_table is None: > + self._table = { > + MessageType.REQUEST: self._enqueue_incoming_request, > + MessageType.RESPONSE: self._enqueue_incoming_response, > + MessageType.NOTIFY: self._enqueue_incoming_notification > + } > + else: > + self._table = disp_table > + self._send_buffer = bytearray() > + # msgids for which we sent a request but have not received a response > + self._pending_requests = set() > + # queues for incoming messages > + self._requests = deque() > + self._notifications = deque() > + self._responses = {} > + self._incoming = 0 # number of incoming messages in our queues > + > + def selectable(self): > + return self._sock > + > + def process_outgoing(self): > + try: > + sent_bytes = self._sock.send(self._send_buffer) > + except IOError: > + sent_bytes = 0 > + del self._send_buffer[:sent_bytes] > + > + def process_incoming(self): > + self.receive_messages(all=True) > + > + def _send_message(self, msg): > + self._send_buffer += msg > + self.process_outgoing() > + > + def send_request(self, method, params): > + """Send a request > + """ > + msg, msgid = self._encoder.create_request(method, params) > + self._send_message(msg) > + self._pending_requests.add(msgid)
Is this order safe? What if response arriving before sending returning _send_message(msg)? > + return msgid > + > + def send_response(self, msgid, error=None, result=None): > + """Send a response > + """ > + msg = self._encoder.create_response(msgid, error, result) > + self._send_message(msg) > + > + def send_notification(self, method, params): > + """Send a notification > + """ > + msg = self._encoder.create_notification(method, params) > + self._send_message(msg) > + > + def receive_messages(self, all=False): > + """Try to receive some messages. > + Received messages are put on the internal queues. > + They can be retrieved using get_xxx() methods. > + Returns True if there's something queued for get_xxx() methods. > + """ > + while all or self._incoming == 0: > + try: > + packet = self._sock.recv(4096) # XXX the size is arbitrary > + except IOError: > + packet = None > + if not packet: > + break > + self._encoder.get_and_dispatch_messages(packet, self._table) > + return self._incoming > 0 > + > + def _enqueue_incoming_request(self, m): > + self._requests.append(m) > + self._incoming += 1 > + > + def _enqueue_incoming_response(self, m): > + msgid, error, result = m > + try: > + self._pending_requests.remove(msgid) > + except KeyError: > + # bogus msgid > + # XXXwarn > + return > + assert not msgid in self._responses > + self._responses[msgid] = (error, result) > + self._incoming += 1 > + > + def _enqueue_incoming_notification(self, m): > + self._notifications.append(m) > + self._incoming += 1 > + > + def _get_message(self, q): > + try: > + m = q.popleft() > + assert self._incoming > 0 > + self._incoming -= 1 > + return m > + except IndexError: > + return None > + > + def get_request(self): > + return self._get_message(self._requests) > + > + def get_response(self, msgid): > + try: > + m = self._responses.pop(msgid) > + assert self._incoming > 0 > + self._incoming -= 1 > + except KeyError: > + return None > + error, result = m > + return (result, error) > + > + def get_notification(self): > + return self._get_message(self._notifications) > + > + > +class RPCError(Exception): > + """an error from server > + """ > + def __init__(self, error): > + self._error = error > + > + def get_value(self): > + return self._error > + > + def __str__(self): > + return str(self._error) > + > + > +class Client(object): > + """a convenient class for a pure rpc client > + *sock* is a socket-like. it should be blocking. > + """ > + def __init__(self, sock, encoder=None, notification_callback=None): > + self._endpoint = EndPoint(sock, encoder) > + if notification_callback is None: > + # ignore notifications by default > + self._notification_callback = lambda n: None > + else: > + self._notification_callback = notification_callback > + > + def _process_input_notification(self): > + n = self._endpoint.get_notification() > + if n: > + self._notification_callback(n) > + > + def _process_input_request(self): > + # ignore requests as we are a pure client > + # XXXwarn > + self._endpoint.get_request() > + > + def call(self, method, params): > + """synchronous call. > + send a request and wait for a response. > + return a result. or raise RPCError exception if the peer > + sends us an error. > + """ > + msgid = self._endpoint.send_request(method, params) > + while True: > + if not self._endpoint.receive_messages(): > + raise EOFError("EOF") > + res = self._endpoint.get_response(msgid) > + if res: > + result, error = res > + if error is None: > + return result > + raise RPCError(error) > + self._process_input_notification() > + self._process_input_request() > + > + def send_notification(self, method, params): > + """send a notification to the peer. > + """ > + self._endpoint.send_notification(method, params) > + > + def receive_notification(self): > + """wait for the next incoming message. > + intended to be used when we have nothing to send but want to receive > + notifications. > + """ > + if not self._endpoint.receive_messages(): > + raise EOFError("EOF") > + self._process_input_notification() > + self._process_input_request() > diff --git a/setup.cfg b/setup.cfg > index 41d1179..0447b1f 100644 > --- a/setup.cfg > +++ b/setup.cfg > @@ -6,7 +6,7 @@ source-dir = doc/source > [bdist_rpm] > Release = 1 > Group = Applications/Accessories > -Requires = python-eventlet, python-routes, python-webob, python-paramiko > +Requires = python-eventlet, python-routes, python-webob, python-paramiko, > python-msgpack > doc_files = LICENSE > MANIFEST.in > README.rst > diff --git a/tools/pip-requires b/tools/pip-requires > index cf7f06b..59c1857 100644 > --- a/tools/pip-requires > +++ b/tools/pip-requires > @@ -2,3 +2,4 @@ eventlet > routes > webob>=1.0.8 > paramiko > +msgpack-python > -- > 1.8.0.1 > > > ------------------------------------------------------------------------------ > Learn Graph Databases - Download FREE O'Reilly Book > "Graph Databases" is the definitive new guide to graph databases and > their applications. This 200-page book is written by three acclaimed > leaders in the field. The early access version is available now. > Download your free book today! http://p.sf.net/sfu/neotech_d2d_may > _______________________________________________ > Ryu-devel mailing list > Ryu-devel@lists.sourceforge.net > https://lists.sourceforge.net/lists/listinfo/ryu-devel > -- yamahata ------------------------------------------------------------------------------ Learn Graph Databases - Download FREE O'Reilly Book "Graph Databases" is the definitive new guide to graph databases and their applications. This 200-page book is written by three acclaimed leaders in the field. The early access version is available now. Download your free book today! http://p.sf.net/sfu/neotech_d2d_may _______________________________________________ Ryu-devel mailing list Ryu-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/ryu-devel