- make create_request returns msgid
- fix msgid wrap around
- rename classes
- convenient transport classes for socket-like
- update requirements
- copyright notice, comments, and assertions

Signed-off-by: YAMAMOTO Takashi <yamam...@valinux.co.jp>
---
 ryu/lib/rpc.py     | 295 ++++++++++++++++++++++++++++++++++++++++++++++++++---
 setup.cfg          |   2 +-
 tools/pip-requires |   1 +
 3 files changed, 282 insertions(+), 16 deletions(-)

diff --git a/ryu/lib/rpc.py b/ryu/lib/rpc.py
index 8455791..57a35ba 100644
--- a/ryu/lib/rpc.py
+++ b/ryu/lib/rpc.py
@@ -1,37 +1,302 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
+# Copyright (C) 2013 YAMAMOTO Takashi <yamamoto at valinux co jp>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# msgpack-rpc
+# http://wiki.msgpack.org/display/MSGPACK/RPC+specification
+
 import msgpack
 
 
-class RpcMessage(object):
+class MessageType(object):
     REQUEST = 0
     RESPONSE = 1
     NOTIFY = 2
 
 
-class RpcSession(object):
+class MessageEncoder(object):
+    """msgpack-rpc encoder/decoder.
+    intended to be transport-agnostic.
+    """
     def __init__(self):
-        super(RpcSession, self).__init__()
-        self._packer = msgpack.Packer()
-        self._unpacker = msgpack.Unpacker()
+        super(MessageEncoder, self).__init__()
+        # note: on-wire msgpack has no notion of encoding.
+        # the msgpack-python library implicitly converts unicode to
+        # utf-8 encoded bytes by default.  we don't want to rely on
+        # the behaviour though because it seems to be going to change.
+        # cf. https://gist.github.com/methane/5022403
+        self._packer = msgpack.Packer(encoding=None)
+        self._unpacker = msgpack.Unpacker(encoding=None)
         self._next_msgid = 0
 
     def _create_msgid(self):
         this_id = self._next_msgid
-        self._next_msgid += 1
+        self._next_msgid = (self._next_msgid + 1) % 0xffffffff
         return this_id
 
     def create_request(self, method, params):
+        assert isinstance(method, str)
+        assert isinstance(params, list)
         msgid = self._create_msgid()
-        return self._packer.pack([RpcMessage.REQUEST, msgid, method, params])
+        return (self._packer.pack([MessageType.REQUEST, msgid, method,
+                                  params]), msgid)
 
-    def create_response(self, msgid, error, result):
-        return self._packer.pack([RpcMessage.RESPONSE, msgid, error, result])
+    def create_response(self, msgid, error=None, result=None):
+        assert isinstance(msgid, int)
+        assert 0 <= msgid and msgid <= 0xffffffff
+        assert error is None or result is None
+        return self._packer.pack([MessageType.RESPONSE, msgid, error, result])
 
     def create_notification(self, method, params):
-        return self._packer.pack([RpcMessage.NOTIFY, method, params])
+        assert isinstance(method, str)
+        assert isinstance(params, list)
+        return self._packer.pack([MessageType.NOTIFY, method, params])
 
-    def get_messages(self, data):
+    def get_and_dispatch_messages(self, data, disp_table):
+        """dissect messages from a raw stream data.
+        disp_table[type] should be a callable for the corresponding
+        MessageType.
+        """
         self._unpacker.feed(data)
-        messages = []
-        for msg in self._unpacker:
-            messages.append(msg)
-        return messages
+        for m in self._unpacker:
+            self._dispatch_message(m, disp_table)
+
+    def _dispatch_message(self, m, disp_table):
+        # XXX validation
+        type = m[0]
+        try:
+            f = disp_table[type]
+        except KeyError:
+            # ignore messages with unknown type
+            return
+        f(m[1:])
+
+
+from collections import deque
+import select
+
+
+class EndPoint(object):
+    """An endpoint
+    *sock* is a socket-like.  it can be either blocking or non-blocking.
+    """
+    def __init__(self, sock, encoder=None, disp_table=None):
+        if encoder is None:
+            encoder = MessageEncoder()
+        self._encoder = encoder
+        self._sock = sock
+        if disp_table is None:
+            self._table = {
+                MessageType.REQUEST: self._enqueue_incoming_request,
+                MessageType.RESPONSE: self._enqueue_incoming_response,
+                MessageType.NOTIFY: self._enqueue_incoming_notification
+            }
+        else:
+            self._table = disp_table
+        self._send_buffer = bytearray()
+        # msgids for which we sent a request but have not received a response
+        self._pending_requests = set()
+        # queues for incoming messages
+        self._requests = deque()
+        self._notifications = deque()
+        self._responses = {}
+        self._incoming = 0  # number of incoming messages in our queues
+
+    def selectable(self):
+        rlist = [self._sock]
+        wlist = []
+        if self._send_buffer:
+            wlist.append(self._sock)
+        return rlist, wlist
+
+    def process_outgoing(self):
+        try:
+            sent_bytes = self._sock.send(self._send_buffer)
+        except IOError:
+            sent_bytes = 0
+        del self._send_buffer[:sent_bytes]
+
+    def process_incoming(self):
+        self.receive_messages(all=True)
+
+    def process(self):
+        self.process_outgoing()
+        self.process_incoming()
+
+    def block(self):
+        rlist, wlist = self.selectable()
+        select.select(rlist, wlist, rlist + wlist)
+
+    def serve(self):
+        while True:
+            self.block()
+            self.process()
+
+    def _send_message(self, msg):
+        self._send_buffer += msg
+        self.process_outgoing()
+
+    def send_request(self, method, params):
+        """Send a request
+        """
+        msg, msgid = self._encoder.create_request(method, params)
+        self._send_message(msg)
+        self._pending_requests.add(msgid)
+        return msgid
+
+    def send_response(self, msgid, error=None, result=None):
+        """Send a response
+        """
+        msg = self._encoder.create_response(msgid, error, result)
+        self._send_message(msg)
+
+    def send_notification(self, method, params):
+        """Send a notification
+        """
+        msg = self._encoder.create_notification(method, params)
+        self._send_message(msg)
+
+    def receive_messages(self, all=False):
+        """Try to receive some messages.
+        Received messages are put on the internal queues.
+        They can be retrieved using get_xxx() methods.
+        Returns True if there's something queued for get_xxx() methods.
+        """
+        while all or self._incoming == 0:
+            try:
+                packet = self._sock.recv(4096)  # XXX the size is arbitrary
+            except IOError:
+                packet = None
+            if not packet:
+                break
+            self._encoder.get_and_dispatch_messages(packet, self._table)
+        return self._incoming > 0
+
+    def _enqueue_incoming_request(self, m):
+        self._requests.append(m)
+        self._incoming += 1
+
+    def _enqueue_incoming_response(self, m):
+        msgid, error, result = m
+        try:
+            self._pending_requests.remove(msgid)
+        except KeyError:
+            # bogus msgid
+            # XXXwarn
+            return
+        assert not msgid in self._responses
+        self._responses[msgid] = (error, result)
+        self._incoming += 1
+
+    def _enqueue_incoming_notification(self, m):
+        self._notifications.append(m)
+        self._incoming += 1
+
+    def _get_message(self, q):
+        try:
+            m = q.popleft()
+            assert self._incoming > 0
+            self._incoming -= 1
+            return m
+        except IndexError:
+            return None
+
+    def get_request(self):
+        return self._get_message(self._requests)
+
+    def get_response(self, msgid):
+        try:
+            m = self._responses.pop(msgid)
+            assert self._incoming > 0
+            self._incoming -= 1
+        except KeyError:
+            return None
+        error, result = m
+        return (result, error)
+
+    def get_notification(self):
+        return self._get_message(self._notifications)
+
+
+class RPCError(Exception):
+    """an error from server
+    """
+    def __init__(self, error):
+        self._error = error
+
+    def get_value(self):
+        return self._error
+
+    def __str__(self):
+        return str(self._error)
+
+
+class Client(object):
+    """a convenient class for a pure rpc client
+    *sock* is a socket-like.  it should be blocking.
+    """
+    def __init__(self, sock, encoder=None, notification_callback=None):
+        self._endpoint = EndPoint(sock, encoder)
+        if notification_callback is None:
+            # ignore notifications by default
+            self._notification_callback = lambda n: None
+        else:
+            self._notification_callback = notification_callback
+
+    def _process_input_notification(self):
+        n = self._endpoint.get_notification()
+        if n:
+            self._notification_callback(n)
+
+    def _process_input_request(self):
+        # ignore requests as we are a pure client
+        # XXXwarn
+        self._endpoint.get_request()
+
+    def call(self, method, params):
+        """synchronous call.
+        send a request and wait for a response.
+        return a result.  or raise RPCError exception if the peer
+        sends us an error.
+        """
+        msgid = self._endpoint.send_request(method, params)
+        while True:
+            if not self._endpoint.receive_messages():
+                raise EOFError("EOF")
+            res = self._endpoint.get_response(msgid)
+            if res:
+                result, error = res
+                if error is None:
+                    return result
+                raise RPCError(error)
+            self._process_input_notification()
+            self._process_input_request()
+
+    def send_notification(self, method, params):
+        """send a notification to the peer.
+        """
+        self._endpoint.send_notification(method, params)
+
+    def receive_notification(self):
+        """wait for the next incoming message.
+        intended to be used when we have nothing to send but want to receive
+        notifications.
+        """
+        if not self._endpoint.receive_messages():
+            raise EOFError("EOF")
+        self._process_input_notification()
+        self._process_input_request()
diff --git a/setup.cfg b/setup.cfg
index 1cbeedd..a8d6a97 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -6,7 +6,7 @@ source-dir = doc/source
 [bdist_rpm]
 Release = 1
 Group = Applications/Accessories
-Requires = python-eventlet, python-routes, python-webob, python-paramiko, 
python-netaddr
+Requires = python-eventlet, python-routes, python-webob, python-paramiko, 
python-netaddr, python-msgpack
 doc_files = LICENSE
             MANIFEST.in
             README.rst
diff --git a/tools/pip-requires b/tools/pip-requires
index 208b9b8..da04e2f 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -3,3 +3,4 @@ routes
 webob>=1.0.8
 paramiko
 netaddr
+msgpack-python
-- 
1.8.1.5


------------------------------------------------------------------------------
This SF.net email is sponsored by Windows:

Build for Windows Store.

http://p.sf.net/sfu/windows-dev2dev
_______________________________________________
Ryu-devel mailing list
Ryu-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/ryu-devel

Reply via email to