Adam Litke has uploaded a new change for review. Change subject: jsonrpc: Rearrange test classes for reusability ......................................................................
jsonrpc: Rearrange test classes for reusability Signed-off-by: Adam Litke <[email protected]> Change-Id: I78f9546c0bfb2348510340922ba95569b1827830 --- M tests/jsonRpcTests.py A tests/jsonRpcUtils.py A vdsm_api/jsonrpc/client.py 3 files changed, 139 insertions(+), 125 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/22/11122/1 diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index f157950..6e90d58d 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -19,10 +19,7 @@ # import threading import socket -from contextlib import contextmanager -from functools import partial from contextlib import closing -import json from testrunner import VdsmTestCase as TestCaseBase, \ expandPermutations, \ @@ -30,135 +27,16 @@ dummyTextGenerator from jsonrpc import \ - tcpReactor, \ JsonRpcError, \ - JsonRpcServer, \ JsonRpcMethodNotFoundError, \ JsonRpcInternalError -PORT_RANGE = xrange(49152, 65535) - - -_distributedPorts = [] - - -def _getFreePort(): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - with closing(sock): - for port in PORT_RANGE: - if port in _distributedPorts: - continue - - try: - sock.bind(("0.0.0.0", port)) - except: - continue - - _distributedPorts.append(port) - return port - else: - raise Exception("Could not find a free port") - - -@contextmanager -def _tcpServerConstructor(messageHandler): - port = _getFreePort() - address = ("localhost", port) - reactor = tcpReactor.TCPReactor(address, messageHandler) - - try: - yield reactor, partial(TCPReactorClient, address) - finally: - reactor.stop() - - -REACTOR_CONSTRUCTORS = {"tcp": _tcpServerConstructor} -REACTOR_TYPE_PERMUTATIONS = [[r] for r in REACTOR_CONSTRUCTORS.iterkeys()] - - -@contextmanager -def constructReactor(tp, messageHandler): - with REACTOR_CONSTRUCTORS[tp](messageHandler) as res: - yield res - - -@contextmanager -def constructServer(tp, bridge): - server = JsonRpcServer(bridge) - with constructReactor(tp, server) as (reactor, clientFactory): - reactor.start_listening() - t = threading.Thread(target=reactor.process_requests) - t.setDaemon(True) - t.start() - - def jsonClientFactory(): - return JsonRpcClient(clientFactory()) - - yield server, jsonClientFactory +from jsonRpcUtils import * class _EchoMessageHandler(object): def handleMessage(self, msgCtx): msgCtx.sendReply(msgCtx.data) - - -class TCPReactorClient(object): - def __init__(self, address): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.address = address - - def connect(self): - self.sock.connect(self.address) - - def sendMessage(self, msg, timeout=None): - msg = tcpReactor._Size.pack(len(msg)) + msg - self.sock.settimeout(timeout) - while msg: - sent = self.sock.send(msg) - msg = msg[sent:] - - def recvMessage(self, timeout=None): - self.sock.settimeout(timeout) - rawSize = self.sock.recv(tcpReactor._Size.size) - size = tcpReactor._Size.unpack(rawSize)[0] - buff = "" - while (size - len(buff)) > 0: - buff += self.sock.recv(size) - - return buff - - def close(self): - self.sock.close() - - -class JsonRpcClient(object): - def __init__(self, reactorClient): - self._transport = reactorClient - - def connect(self): - self._transport.connect() - - def callMethod(self, methodName, params=(), reqId=None): - msg = {'jsonrpc': '2.0', - 'method': methodName, - 'params': params, - 'id': reqId} - - self._transport.sendMessage(json.dumps(msg, 'utf-8')) - # Notifications have no repsonse - if reqId is None: - return - - resp = self._transport.recvMessage() - resp = json.loads(resp) - if resp.get('error') is not None: - raise JsonRpcError(resp['error']['code'], - resp['error']['message']) - - return resp.get('result') - - def close(self): - self._transport.close() @expandPermutations @@ -177,8 +55,8 @@ self.log.error("Server died unexpectedly", exc_info=True) self.fail("Server died: (%s) %s" % (type(e), e)) - with constructReactor(reactorType, msgHandler) as (reactor, - clientFactory): + with constructReactor(reactorType, msgHandler) \ + as (reactor, clientFactory): reactor.start_listening() t = threading.Thread(target=serve, args=(reactor,)) t.setDaemon(True) diff --git a/tests/jsonRpcUtils.py b/tests/jsonRpcUtils.py new file mode 100644 index 0000000..2d782fc --- /dev/null +++ b/tests/jsonRpcUtils.py @@ -0,0 +1,73 @@ +import threading +import socket +from contextlib import closing +from contextlib import contextmanager +from functools import partial + +from jsonrpc import \ + JsonRpcServer, \ + tcpReactor +from jsonrpc.client import \ + JsonRpcClient, \ + TCPReactorClient + +_PORT_RANGE = xrange(49152, 65535) + + +_distributedPorts = [] + + +def getFreePort(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + with closing(sock): + for port in _PORT_RANGE: + if port in _distributedPorts: + continue + + try: + sock.bind(("0.0.0.0", port)) + except: + continue + + _distributedPorts.append(port) + return port + else: + raise Exception("Could not find a free port") + + +@contextmanager +def _tcpServerConstructor(messageHandler): + port = getFreePort() + address = ("localhost", port) + reactor = tcpReactor.TCPReactor(address, messageHandler) + + try: + yield reactor, partial(TCPReactorClient, address) + finally: + reactor.stop() + + +_REACTOR_CONSTRUCTORS = {"tcp": _tcpServerConstructor} +REACTOR_TYPE_PERMUTATIONS = [[r] for r in _REACTOR_CONSTRUCTORS.iterkeys()] + + +@contextmanager +def constructReactor(tp, messageHandler): + with _REACTOR_CONSTRUCTORS[tp](messageHandler) as res: + yield res + + +@contextmanager +def constructServer(tp, bridge): + server = JsonRpcServer(bridge) + with constructReactor(tp, server) as (reactor, clientFactory): + reactor.start_listening() + t = threading.Thread(target=reactor.process_requests) + t.setDaemon(True) + t.start() + + def jsonClientFactory(): + return JsonRpcClient(clientFactory()) + + yield server, jsonClientFactory + diff --git a/vdsm_api/jsonrpc/client.py b/vdsm_api/jsonrpc/client.py new file mode 100644 index 0000000..5504dfd --- /dev/null +++ b/vdsm_api/jsonrpc/client.py @@ -0,0 +1,63 @@ +import json +import socket + +from jsonrpc import \ + JsonRpcError, \ + tcpReactor + +class JsonRpcClient(object): + def __init__(self, reactorClient): + self._transport = reactorClient + + def connect(self): + self._transport.connect() + + def callMethod(self, methodName, params=(), reqId=None): + msg = {'jsonrpc': '2.0', + 'method': methodName, + 'params': params, + 'id': reqId} + + self._transport.sendMessage(json.dumps(msg, 'utf-8')) + # Notifications have no repsonse + if reqId is None: + return + + resp = self._transport.recvMessage() + resp = json.loads(resp) + if resp.get('error') is not None: + raise JsonRpcError(resp['error']['code'], + resp['error']['message']) + + return resp.get('result') + + def close(self): + self._transport.close() + +class TCPReactorClient(object): + def __init__(self, address): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.address = address + + def connect(self): + self.sock.connect(self.address) + + def sendMessage(self, msg, timeout=None): + msg = tcpReactor._Size.pack(len(msg)) + msg + self.sock.settimeout(timeout) + while msg: + sent = self.sock.send(msg) + msg = msg[sent:] + + def recvMessage(self, timeout=None): + self.sock.settimeout(timeout) + rawSize = self.sock.recv(tcpReactor._Size.size) + size = tcpReactor._Size.unpack(rawSize)[0] + buff = "" + while (size - len(buff)) > 0: + buff += self.sock.recv(size) + + return buff + + def close(self): + self.sock.close() -- To view, visit http://gerrit.ovirt.org/11122 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I78f9546c0bfb2348510340922ba95569b1827830 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Adam Litke <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
