Saggi Mizrahi has uploaded a new change for review. Change subject: Federate context and request-response objects ......................................................................
Federate context and request-response objects This is the ground work needed to allow batch requests and bidirectional jsonrpc. Change-Id: I091ca9acad5940418cbf3961a76a06cbfef976f0 Signed-off-by: Saggi Mizrahi <[email protected]> --- M vdsm_api/jsonrpc/__init__.py 1 file changed, 96 insertions(+), 74 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/75/10375/1 diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py index 2ef6e36..3e6a6aa 100644 --- a/vdsm_api/jsonrpc/__init__.py +++ b/vdsm_api/jsonrpc/__init__.py @@ -59,53 +59,27 @@ JsonRpcError.__init__(self, -32603, msg) -class _JsonRpcRequest(object): - def __init__(self, ctx, queue, methodName, params, reqId): - self.method = methodName +class JsonRpcRequest(object): + def __init__(self, method, params=(), reqId=None): + self.method = method self.params = params self.id = reqId - self._ctx = ctx - self._queue = queue - def invokeFunction(self, func): - if isinstance(self.params, list): - return func(*self.params) - else: - return func(**self.params) + def encode(self): + res = {'jsonrpc': '2.0', + 'method': self.method, + 'params': self.params, + 'id': self.id} - def isNotification(self): - return self.id is None + return json.dumps(res, 'utf-8') - def sendReply(self, result, error): - # TBD: Should calling this for a notification raise an error or be - # ignored - self._queue.put_nowait(_JsonRpcResponse(self._ctx, self.id, result, - error)) - - -class _JsonRpcResponse(object): - def __init__(self, ctx, reqId, result, error): - self.ctx = ctx - self.result = result - self.error = error - self.id = reqId - - -class JsonRpcServer(object): - log = logging.getLogger("jsonrpc.JsonRpcServer") - - def __init__(self, bridge, threadFactory=None): - self._bridge = bridge - self._workQueue = Queue() - self._threadFactory = threadFactory - - def _parseMessage(self, msg): + @staticmethod + def decode(msg): try: - return json.loads(msg, 'utf-8') + obj = json.loads(msg, 'utf-8') except: raise JsonRpcParseError() - def _parseRequest(self, obj, ctx, queue): if obj.get("jsonrpc") != "2.0": raise JsonRpcInvalidRequestError() @@ -121,59 +95,106 @@ if not isinstance(params, (list, dict)): raise JsonRpcInvalidRequestError() - return _JsonRpcRequest(ctx, queue, method, params, reqId) + return JsonRpcRequest(method, params, reqId) - def _jsonError2Response(self, err, req): - respId = None - if req is not None: - respId = req.id + def isNotification(self): + return (self.id is None) - return json.dumps({"jsonrpc": "2.0", - "error": {"code": err.code, "message": err.message}, - "id": respId}) - def _generateResponse(self, resp): - res = {"jsonrpc": "2.0", - "id": resp.id} - if resp.error is not None: - res['error'] = {'code': resp.error.code, - 'message': resp.error.message} +class JsonRpcResponse(object): + def __init__(self, result=None, error=None, reqId=None): + self.result = result + self.error = error + self.id = reqId + + def encode(self): + res = {'jsonrpc': '2.0', + 'id': self.id} + + if self.error is not None: + res['error'] = {'code': self.error.code, + 'message': self.error.message} else: - res['result'] = resp.result + res['result'] = self.result return json.dumps(res, 'utf-8') - def _serveRequest(self, req): + @staticmethod + def decode(msg): + obj = json.loads(msg, 'utf-8') + # TODO: More validations + result = obj.get('result') + error = JsonRpcError(**obj.get('error')) + reqId = obj.get('id') + return JsonRpcResponse(result, error, reqId) + + +class _JsonRpcRequestContext(object): + def __init__(self, ctx, queue, request): + self.request = request + self._ctx = ctx + self._queue = queue + + def isNotification(self): + return self.id is None + + def sendReply(self, result, error): + # TBD: Should calling this for a notification raise an error or be + # ignored + resp = JsonRpcResponse(result, + error, + self.request.id) + self._queue.put_nowait(_JsonRpcResponseContext(self._ctx, resp)) + + +class _JsonRpcResponseContext(object): + def __init__(self, ctx, response): + self.response = response + self.ctx = ctx + + +class JsonRpcServer(object): + log = logging.getLogger("jsonrpc.JsonRpcServer") + + def __init__(self, bridge, threadFactory=None): + self._bridge = bridge + self._workQueue = Queue() + self._threadFactory = threadFactory + + def _serveRequest(self, ctx): + req = ctx.request mangledMethod = req.method.replace(".", "_") self.log.debug("Looking for method '%s' in bridge", mangledMethod) try: method = getattr(self._bridge, mangledMethod) except AttributeError: - req.sendReply(None, JsonRpcMethodNotFoundError()) + ctx.sendReply(None, JsonRpcMethodNotFoundError()) else: try: - res = req.invokeFunction(method) + params = req.params + if isinstance(req.params, list): + res = method(*params) + else: + res = method(**params) except JsonRpcError as e: - req.sendReply(None, e) + ctx.sendReply(None, e) except Exception as e: - req.sendReply(None, JsonRpcInternalError(str(e))) + ctx.sendReply(None, JsonRpcInternalError(str(e))) else: - return req.sendReply(res, None) + return ctx.sendReply(res, None) - def _processResponse(self, resp): + def _processResponse(self, ctx): try: - msg = self._generateResponse(resp) + msg = ctx.response.encode() except Exception as e: # Probably result failed to be serialized as json - errResp = _JsonRpcResponse(resp.ctx, - resp.id, - None, - JsonRpcInternalError(str(e))) + errResp = JsonRpcResponse(error=JsonRpcInternalError(str(e)), + reqId=ctx.response.id) - msg = self._generateResponse(errResp) + msg = errResp.encode() - resp.ctx.sendReply(msg) + ctx.ctx.sendReply(msg) def serve_requests(self): while True: @@ -181,7 +202,7 @@ if obj is None: break - if isinstance(obj, _JsonRpcRequest): + if isinstance(obj, _JsonRpcRequestContext): if self._threadFactory is None: self._serveRequest(obj) else: @@ -194,10 +215,10 @@ req = None error = None try: - obj = self._parseMessage(msgCtx.data) - req = self._parseRequest(obj, msgCtx, self._workQueue) + req = JsonRpcRequest.decode(msgCtx.data) + ctx = _JsonRpcRequestContext(msgCtx, self._workQueue, req) - self._workQueue.put_nowait(req) + self._workQueue.put_nowait(ctx) return except JsonRpcError as e: @@ -214,11 +235,12 @@ return if req is None: - resp = _JsonRpcResponse(msgCtx, None, None, error) + resp = JsonRpcResponse(None, error, None) else: - resp = _JsonRpcResponse(msgCtx, req.id, None, error) + resp = JsonRpcResponse(None, error, req.id) - self._workQueue.put_nowait(resp) + ctx = _JsonRpcResponseContext(msgCtx, resp) + self._workQueue.put_nowait(ctx) def stop(self): self._workQueue.put_nowait(None) -- To view, visit http://gerrit.ovirt.org/10375 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I091ca9acad5940418cbf3961a76a06cbfef976f0 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
