Nir Soffer has posted comments on this change. Change subject: json-rpc: Protocol detection ......................................................................
Patch Set 20: (22 comments) http://gerrit.ovirt.org/#/c/26300/20/vdsm/protocolDetector.py File vdsm/protocolDetector.py: Line 32: Line 33: class MultiProtocolAcceptor: Line 34: log = logging.getLogger("protocolDetector.MultiProtocolAcceptor") Line 35: Line 36: READ_ONLY = (select.POLLIN | select.POLLPRI | select.POLLHUP Consider renaming this to READ_ONLY_MASK Line 37: | select.POLLERR) Line 38: Line 39: def __init__(self, host, port, sslctx=None): Line 40: self._sslctx = sslctx Line 47: Line 48: self.poller = select.poll() Line 49: self.socket = self._create_socket(host, port) Line 50: self.pending_connections = {} Line 51: self.handlers = None Why these are public? Looks like this object should not have any public instance variables. Line 52: Line 53: self._jsonBinding = None Line 54: self._xmlBinding = None Line 55: Line 58: flags = flags | os.O_NONBLOCK Line 59: fcntl.fcntl(fd, fcntl.F_SETFL, flags) Line 60: Line 61: def serve_forever(self): Line 62: self._isRunning = True You use mixedCase only for three instance variables in this class: _isRunning, _jsonBinding, and _xmlBinding. Everything else use_underscore. Please be consistent and use the same style through this module. Since most of this code use_underscores, and it is much more readable, I suggest that you use that style. Line 63: self.poller.register(self.socket, self.READ_ONLY) Line 64: self.poller.register(self._read_fd, self.READ_ONLY) Line 65: try: Line 66: while self._isRunning: Line 65: try: Line 66: while self._isRunning: Line 67: events = self.poller.poll() Line 68: Line 69: for fd, flag in events: Python document poll() to return (fd, event). We should use the same terms. https://docs.python.org/2/library/select.html#select.poll.poll Line 70: if flag & (select.POLLIN | select.POLLPRI): Line 71: if fd is self._read_fd: Line 72: self._cleanup_wakeup_pipe() Line 73: self._cleanup_pending_connections() Line 81: for handler in self.handlers: Line 82: handler.stop() Line 83: Line 84: self._jsonBinding.stop() Line 85: self.poller.unregister(self.socket) Why you don't unregister self._read_fd? Line 86: self.socket.close() Line 87: Line 88: def _trigger_connection_cleanup(self): Line 89: threading.Timer(30, self._trigger_connection_cleanup).start() Line 82: handler.stop() Line 83: Line 84: self._jsonBinding.stop() Line 85: self.poller.unregister(self.socket) Line 86: self.socket.close() You must close also the pipe - otherwise we are leaking two file descriptors. Finally, you should close the pending_connections - maybe they were just closed because of the wakeup, but it is also possible that the event loop woke up because of an event on one of the sockets, and _cleanup_pending_connection was not called. So you should: for fd, (accepted, client_socket) in self.pending_connections.items(): self._remove_connection(client_socket) client_socket.close() Line 87: Line 88: def _trigger_connection_cleanup(self): Line 89: threading.Timer(30, self._trigger_connection_cleanup).start() Line 90: self.wakeup() Line 85: self.poller.unregister(self.socket) Line 86: self.socket.close() Line 87: Line 88: def _trigger_connection_cleanup(self): Line 89: threading.Timer(30, self._trigger_connection_cleanup).start() This will create a new thread each 30 seconds. It would be better to have a single thread that calls wakeup every 30 seconds instead. Or even better, have no thread, and use the event loop: In init: self._next_cleanup = 0 In serve_forever(): self._next_cleanup = time.time() + CLEANUP_INTERVAL while self._is_running: timeout = max(self._next_cleanup - time.time(), 0) events = self.poller.poll(timeout) for fd, flag in events: handle fd events... now = time.time() if now > self._next_cleanup: self._next_cleanup = now + CLEANUP_INTERVAL self._cleanup_pending_connections() I think we have enough threads in vdsm :-) Line 90: self.wakeup() Line 91: Line 92: def _cleanup_pending_connections(self): Line 93: for (fd, (accepted, client_socket)) in self.pending_connections: Line 89: threading.Timer(30, self._trigger_connection_cleanup).start() Line 90: self.wakeup() Line 91: Line 92: def _cleanup_pending_connections(self): Line 93: for (fd, (accepted, client_socket)) in self.pending_connections: Iterating over the dictionary returns the keys, and you are trying to get the the items. You need: for fd, (accepted, client_socket) in self.pending_connections.items(): ... Line 94: if time.time() - accepted > 30.0: Line 95: client_socket.close() Line 96: del self.pending_connections[fd] Line 97: Line 90: self.wakeup() Line 91: Line 92: def _cleanup_pending_connections(self): Line 93: for (fd, (accepted, client_socket)) in self.pending_connections: Line 94: if time.time() - accepted > 30.0: The timeout should be a constant. Line 95: client_socket.close() Line 96: del self.pending_connections[fd] Line 97: Line 98: def detect_protocol(self, data): Line 92: def _cleanup_pending_connections(self): Line 93: for (fd, (accepted, client_socket)) in self.pending_connections: Line 94: if time.time() - accepted > 30.0: Line 95: client_socket.close() Line 96: del self.pending_connections[fd] What about removing the fd from the poller? I think that you need methods for adding and removing sockets, because both adding an removing must modify both the poller and the pending connections. Then this will be: if time.time() - accepted > DETECTION_TIMEOUT: self._remove_connection(client_socket) client.socket.close() Line 97: Line 98: def detect_protocol(self, data): Line 99: for handler in self.handlers: Line 100: if handler.detect(data): Line 98: def detect_protocol(self, data): Line 99: for handler in self.handlers: Line 100: if handler.detect(data): Line 101: return handler Line 102: return None It would be more clear if you raise here: raise CannotDetectProtocol(data) We do not expect to get unknown protocol here. Line 103: Line 104: def add_json_binding(self, jsonBinding): Line 105: self._jsonBinding = jsonBinding Line 106: self._jsonBinding.start() Line 102: return None Line 103: Line 104: def add_json_binding(self, jsonBinding): Line 105: self._jsonBinding = jsonBinding Line 106: self._jsonBinding.start() In patch set 17 I asked why you start the bindings, suggesting that somebody else will be responsible for this, and you replied "Done". Can you explain why you are still doing it? Line 107: Line 108: def add_xml_binding(self, xmlBinding): Line 109: self._xmlBinding = xmlBinding Line 110: self._xmlBinding.start() Line 110: self._xmlBinding.start() Line 111: Line 112: def init_handlers(self): Line 113: self.handlers = [_StompDetector(self._jsonBinding), Line 114: _XmlDetector(self._xmlBinding)] Do you have any reason to initialize the handler in a separate method? This way you must first call add_json_binding, and add_xml_binding, and then call init_handlers. Any other order will be wrong. It is much robust if you do something like this: def add_detector(detector): self.handlers.append(detector) The caller of add_detector will do: detector = StompDetector(jsonBindings) acceptor.add_detector(detector) This remove the dependency on the detectors, call order, and bindings. Line 115: Line 116: def stop(self): Line 117: self._isRunning = False Line 118: self.wakeup() Line 136: client_socket.setblocking(0) Line 137: Line 138: self.pending_connections[client_socket.fileno()] = (time.time(), Line 139: client_socket) Line 140: self.poller.register(client_socket, self.READ_ONLY) Lets extract: def _add_connection(self, client_socket): client_socket.setblocking(0) self.pending_connections[client_socket.fileno()] = (time.time(), client_socket) self.poller.register(client_socket, self.READ_ONLY) and: def _remove_connection(self, cient_socket): self.poller.unregister(client_socket) del self.pending_connection[client_socket.fileno()] client_socket.setblocking(1) Line 141: Line 142: def _handle_connection_read(self, fd): Line 143: try: Line 144: _, client_socket = self.pending_connections[fd] Line 140: self.poller.register(client_socket, self.READ_ONLY) Line 141: Line 142: def _handle_connection_read(self, fd): Line 143: try: Line 144: _, client_socket = self.pending_connections[fd] Why is this inside the try block? it cannot raise socket.error. Please keep try-blocks minimal. Line 145: data = client_socket.recv(4096, socket.MSG_PEEK) Line 146: except socket.error as e: Line 147: if e.errno not in (errno.EAGAIN, errno.EINPROGRESS): Line 148: self.log.warn("Not able to read data") Line 141: Line 142: def _handle_connection_read(self, fd): Line 143: try: Line 144: _, client_socket = self.pending_connections[fd] Line 145: data = client_socket.recv(4096, socket.MSG_PEEK) Why do you copy up to 4096 bytes, when you need only few bytes to detect the protocol? Do you have a good reason for this? This can specially be problematic for the xml detector, which look for /RPC2 inside the data, and may find some "/RPC2" somewhere near the end of the read data. Fixing the detector to check the path startswith "/RPC2" will eliminate this issue, but still, this confuse the reader, about your intention, which seems to be, looking at the detectors code, doing a very quick detection by looking at the first few bytes of the request. The longest match use in this module is the xml match, needing 10 bytes: POST /RPC2 Of course this class should not know anything about the detectors internals, but if we add a simple interface like required_size to the detectors, we can read just the bytes we need: self._required_size = max(h.required_size for h in self.handlers) And use this value for peeking: data = client_socket.recv(self._required_size, socket.MSG_PEEK) This makes your intent very clear. Line 146: except socket.error as e: Line 147: if e.errno not in (errno.EAGAIN, errno.EINPROGRESS): Line 148: self.log.warn("Not able to read data") Line 149: client_socket.close() Line 144: _, client_socket = self.pending_connections[fd] Line 145: data = client_socket.recv(4096, socket.MSG_PEEK) Line 146: except socket.error as e: Line 147: if e.errno not in (errno.EAGAIN, errno.EINPROGRESS): Line 148: self.log.warn("Not able to read data") Please log the error in this case: self.log.warning("Not able to read data: %s", e) Note: log.warn is not documented, and probably does not exists. Line 149: client_socket.close() Line 150: return Line 151: Line 152: handler = self.detect_protocol(data) Line 145: data = client_socket.recv(4096, socket.MSG_PEEK) Line 146: except socket.error as e: Line 147: if e.errno not in (errno.EAGAIN, errno.EINPROGRESS): Line 148: self.log.warn("Not able to read data") Line 149: client_socket.close() But you did not remove the socket from the pending_connections and poller. Please use: self._remove_connection(client_socket) client_socket.close() Line 150: return Line 151: Line 152: handler = self.detect_protocol(data) Line 153: if handler: Line 149: client_socket.close() Line 150: return Line 151: Line 152: handler = self.detect_protocol(data) Line 153: if handler: It would be more clear to handle the error first, as you do above, and continue with the normal flow otherwise: handler = self.detect_protocol(data) if handler is None: handle error... handle it Line 154: self.poller.unregister(client_socket) Line 155: del self.pending_connections[fd] Line 156: client_socket.setblocking(1) Line 157: handler.handleSocket(client_socket, Line 152: handler = self.detect_protocol(data) Line 153: if handler: Line 154: self.poller.unregister(client_socket) Line 155: del self.pending_connections[fd] Line 156: client_socket.setblocking(1) Lines 154-156 can be replace with _remove_connection Line 157: handler.handleSocket(client_socket, Line 158: client_socket.getsockname()) Line 159: else: Line 160: self.log.warn("Unrecognized protocol") Line 157: handler.handleSocket(client_socket, Line 158: client_socket.getsockname()) Line 159: else: Line 160: self.log.warn("Unrecognized protocol") Line 161: client_socket.close() Again, you don't remove the socket from the pending dict or the poller. Line 162: Line 163: def _create_socket(self, host, port): Line 164: addr = socket.getaddrinfo(host, port, socket.AF_INET, Line 165: socket.SOCK_STREAM) Line 223: if not self.xmlBinding: Line 224: self.log.warn("Xml handler not initialized") Line 225: Line 226: def detect(self, data): Line 227: return data.startswith("POST") and "/RPC2" in data This should be something like: try: method, rest = data.split(" ", 1) except ValueError: return False return method == "POST" and rest.startswith("/RPC2") Otherwise this request will match while it should not: POST / HTTP/1.1 Surprise: /RPC2 ... It is unlikely that someone will send this request, but why have a fragile detector when it is so easy to have robust one? Line 228: Line 229: def handleSocket(self, client_socket, socket_address): Line 230: if self.xmlBinding: Line 231: self.xmlBinding.add_socket(client_socket, socket_address) -- To view, visit http://gerrit.ovirt.org/26300 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Id739a40e2b37dcc175137ec91cd5ec166ad24a75 Gerrit-PatchSet: 20 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <[email protected]> Gerrit-Reviewer: Barak Azulay <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Piotr Kliczewski <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: Yeela Kaplan <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
