Saggi Mizrahi has uploaded a new change for review.

Change subject: Federate context and request-response objects
......................................................................

Federate context and request-response objects

This is the ground work needed to allow batch requests and bidirectional
jsonrpc.

Change-Id: I091ca9acad5940418cbf3961a76a06cbfef976f0
Signed-off-by: Saggi Mizrahi <[email protected]>
---
M vdsm_api/jsonrpc/__init__.py
1 file changed, 96 insertions(+), 74 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/75/10375/1

diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py
index 2ef6e36..3e6a6aa 100644
--- a/vdsm_api/jsonrpc/__init__.py
+++ b/vdsm_api/jsonrpc/__init__.py
@@ -59,53 +59,27 @@
         JsonRpcError.__init__(self, -32603, msg)
 
 
-class _JsonRpcRequest(object):
-    def __init__(self, ctx, queue, methodName, params, reqId):
-        self.method = methodName
+class JsonRpcRequest(object):
+    def __init__(self, method, params=(), reqId=None):
+        self.method = method
         self.params = params
         self.id = reqId
-        self._ctx = ctx
-        self._queue = queue
 
-    def invokeFunction(self, func):
-        if isinstance(self.params, list):
-            return func(*self.params)
-        else:
-            return func(**self.params)
+    def encode(self):
+        res = {'jsonrpc': '2.0',
+               'method': self.method,
+               'params': self.params,
+               'id': self.id}
 
-    def isNotification(self):
-        return self.id is None
+        return json.dumps(res, 'utf-8')
 
-    def sendReply(self, result, error):
-        # TBD: Should calling this for a notification raise an error or be
-        #      ignored
-        self._queue.put_nowait(_JsonRpcResponse(self._ctx, self.id, result,
-                                                error))
-
-
-class _JsonRpcResponse(object):
-    def __init__(self, ctx, reqId, result, error):
-        self.ctx = ctx
-        self.result = result
-        self.error = error
-        self.id = reqId
-
-
-class JsonRpcServer(object):
-    log = logging.getLogger("jsonrpc.JsonRpcServer")
-
-    def __init__(self, bridge, threadFactory=None):
-        self._bridge = bridge
-        self._workQueue = Queue()
-        self._threadFactory = threadFactory
-
-    def _parseMessage(self, msg):
+    @staticmethod
+    def decode(msg):
         try:
-            return json.loads(msg, 'utf-8')
+            obj = json.loads(msg, 'utf-8')
         except:
             raise JsonRpcParseError()
 
-    def _parseRequest(self, obj, ctx, queue):
         if obj.get("jsonrpc") != "2.0":
             raise JsonRpcInvalidRequestError()
 
@@ -121,59 +95,106 @@
         if not isinstance(params, (list, dict)):
             raise JsonRpcInvalidRequestError()
 
-        return _JsonRpcRequest(ctx, queue, method, params, reqId)
+        return JsonRpcRequest(method, params, reqId)
 
-    def _jsonError2Response(self, err, req):
-        respId = None
-        if req is not None:
-            respId = req.id
+    def isNotification(self):
+        return (self.id is None)
 
-        return json.dumps({"jsonrpc": "2.0",
-                           "error": {"code": err.code, "message": err.message},
-                           "id": respId})
 
-    def _generateResponse(self, resp):
-        res = {"jsonrpc": "2.0",
-               "id": resp.id}
-        if resp.error is not None:
-            res['error'] = {'code': resp.error.code,
-                            'message': resp.error.message}
+class JsonRpcResponse(object):
+    def __init__(self, result=None, error=None, reqId=None):
+        self.result = result
+        self.error = error
+        self.id = reqId
+
+    def encode(self):
+        res = {'jsonrpc': '2.0',
+               'id': self.id}
+
+        if self.error is not None:
+            res['error'] = {'code': self.error.code,
+                            'message': self.error.message}
         else:
-            res['result'] = resp.result
+            res['result'] = self.result
 
         return json.dumps(res, 'utf-8')
 
-    def _serveRequest(self, req):
+    @staticmethod
+    def decode(msg):
+        obj = json.loads(msg, 'utf-8')
+        # TODO: More validations
+        result = obj.get('result')
+        error = JsonRpcError(**obj.get('error'))
+        reqId = obj.get('id')
+        return JsonRpcResponse(result, error, reqId)
+
+
+class _JsonRpcRequestContext(object):
+    def __init__(self, ctx, queue, request):
+        self.request = request
+        self._ctx = ctx
+        self._queue = queue
+
+    def isNotification(self):
+        return self.id is None
+
+    def sendReply(self, result, error):
+        # TBD: Should calling this for a notification raise an error or be
+        #      ignored
+        resp = JsonRpcResponse(result,
+                               error,
+                               self.request.id)
+        self._queue.put_nowait(_JsonRpcResponseContext(self._ctx, resp))
+
+
+class _JsonRpcResponseContext(object):
+    def __init__(self, ctx, response):
+        self.response = response
+        self.ctx = ctx
+
+
+class JsonRpcServer(object):
+    log = logging.getLogger("jsonrpc.JsonRpcServer")
+
+    def __init__(self, bridge, threadFactory=None):
+        self._bridge = bridge
+        self._workQueue = Queue()
+        self._threadFactory = threadFactory
+
+    def _serveRequest(self, ctx):
+        req = ctx.request
         mangledMethod = req.method.replace(".", "_")
         self.log.debug("Looking for method '%s' in bridge",
                        mangledMethod)
         try:
             method = getattr(self._bridge, mangledMethod)
         except AttributeError:
-            req.sendReply(None, JsonRpcMethodNotFoundError())
+            ctx.sendReply(None, JsonRpcMethodNotFoundError())
         else:
             try:
-                res = req.invokeFunction(method)
+                params = req.params
+                if isinstance(req.params, list):
+                    res = method(*params)
+                else:
+                    res = method(**params)
             except JsonRpcError as e:
-                req.sendReply(None, e)
+                ctx.sendReply(None, e)
             except Exception as e:
-                req.sendReply(None, JsonRpcInternalError(str(e)))
+                ctx.sendReply(None, JsonRpcInternalError(str(e)))
             else:
-                return req.sendReply(res, None)
+                return ctx.sendReply(res, None)
 
-    def _processResponse(self, resp):
+    def _processResponse(self, ctx):
         try:
-            msg = self._generateResponse(resp)
+            msg = ctx.response.encode()
         except Exception as e:
             # Probably result failed to be serialized as json
-            errResp = _JsonRpcResponse(resp.ctx,
-                                       resp.id,
-                                       None,
-                                       JsonRpcInternalError(str(e)))
+            errResp = JsonRpcResponse(error=JsonRpcInternalError(str(e)),
+                                      reqId=ctx.response.id)
 
-            msg = self._generateResponse(errResp)
+            msg = errResp.encode()
 
-        resp.ctx.sendReply(msg)
+        ctx.ctx.sendReply(msg)
 
     def serve_requests(self):
         while True:
@@ -181,7 +202,7 @@
             if obj is None:
                 break
 
-            if isinstance(obj, _JsonRpcRequest):
+            if isinstance(obj, _JsonRpcRequestContext):
                 if self._threadFactory is None:
                     self._serveRequest(obj)
                 else:
@@ -194,10 +215,10 @@
         req = None
         error = None
         try:
-            obj = self._parseMessage(msgCtx.data)
-            req = self._parseRequest(obj, msgCtx, self._workQueue)
+            req = JsonRpcRequest.decode(msgCtx.data)
+            ctx = _JsonRpcRequestContext(msgCtx, self._workQueue, req)
 
-            self._workQueue.put_nowait(req)
+            self._workQueue.put_nowait(ctx)
             return
 
         except JsonRpcError as e:
@@ -214,11 +235,12 @@
             return
 
         if req is None:
-            resp = _JsonRpcResponse(msgCtx, None, None, error)
+            resp = JsonRpcResponse(None, error, None)
         else:
-            resp = _JsonRpcResponse(msgCtx, req.id, None, error)
+            resp = JsonRpcResponse(None, error, req.id)
 
-        self._workQueue.put_nowait(resp)
+        ctx = _JsonRpcResponseContext(msgCtx, resp)
+        self._workQueue.put_nowait(ctx)
 
     def stop(self):
         self._workQueue.put_nowait(None)


--
To view, visit http://gerrit.ovirt.org/10375
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I091ca9acad5940418cbf3961a76a06cbfef976f0
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to