Nir Soffer has posted comments on this change.

Change subject: json-rpc: Protocol detection
......................................................................


Patch Set 20:

(22 comments)

http://gerrit.ovirt.org/#/c/26300/20/vdsm/protocolDetector.py
File vdsm/protocolDetector.py:

Line 32: 
Line 33: class MultiProtocolAcceptor:
Line 34:     log = logging.getLogger("protocolDetector.MultiProtocolAcceptor")
Line 35: 
Line 36:     READ_ONLY = (select.POLLIN | select.POLLPRI | select.POLLHUP
Consider renaming this to READ_ONLY_MASK
Line 37:                  | select.POLLERR)
Line 38: 
Line 39:     def __init__(self, host, port, sslctx=None):
Line 40:         self._sslctx = sslctx


Line 47: 
Line 48:         self.poller = select.poll()
Line 49:         self.socket = self._create_socket(host, port)
Line 50:         self.pending_connections = {}
Line 51:         self.handlers = None
Why these are public? Looks like this object should not have any public 
instance variables.
Line 52: 
Line 53:         self._jsonBinding = None
Line 54:         self._xmlBinding = None
Line 55: 


Line 58:         flags = flags | os.O_NONBLOCK
Line 59:         fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Line 60: 
Line 61:     def serve_forever(self):
Line 62:         self._isRunning = True
You use mixedCase only for three instance variables in this class: _isRunning, 
_jsonBinding, and _xmlBinding. Everything else use_underscore. Please be 
consistent and use the same style through this module. Since most of this code 
use_underscores, and it is much more readable, I suggest that you use that 
style.
Line 63:         self.poller.register(self.socket, self.READ_ONLY)
Line 64:         self.poller.register(self._read_fd, self.READ_ONLY)
Line 65:         try:
Line 66:             while self._isRunning:


Line 65:         try:
Line 66:             while self._isRunning:
Line 67:                 events = self.poller.poll()
Line 68: 
Line 69:                 for fd, flag in events:
Python document poll() to return (fd, event). We should use the same terms.
https://docs.python.org/2/library/select.html#select.poll.poll
Line 70:                     if flag & (select.POLLIN | select.POLLPRI):
Line 71:                         if fd is self._read_fd:
Line 72:                             self._cleanup_wakeup_pipe()
Line 73:                             self._cleanup_pending_connections()


Line 81:             for handler in self.handlers:
Line 82:                 handler.stop()
Line 83: 
Line 84:             self._jsonBinding.stop()
Line 85:             self.poller.unregister(self.socket)
Why you don't unregister self._read_fd?
Line 86:             self.socket.close()
Line 87: 
Line 88:     def _trigger_connection_cleanup(self):
Line 89:         threading.Timer(30, self._trigger_connection_cleanup).start()


Line 82:                 handler.stop()
Line 83: 
Line 84:             self._jsonBinding.stop()
Line 85:             self.poller.unregister(self.socket)
Line 86:             self.socket.close()
You must close also the pipe - otherwise we are leaking two file descriptors.

Finally, you should close the pending_connections - maybe they were just closed 
because of the wakeup, but it is also possible that the event loop woke up 
because of an event on one of the sockets, and _cleanup_pending_connection was 
not called. So you should:

    for fd, (accepted, client_socket) in self.pending_connections.items():
        self._remove_connection(client_socket)
        client_socket.close()
Line 87: 
Line 88:     def _trigger_connection_cleanup(self):
Line 89:         threading.Timer(30, self._trigger_connection_cleanup).start()
Line 90:         self.wakeup()


Line 85:             self.poller.unregister(self.socket)
Line 86:             self.socket.close()
Line 87: 
Line 88:     def _trigger_connection_cleanup(self):
Line 89:         threading.Timer(30, self._trigger_connection_cleanup).start()
This will create a new thread each 30 seconds. It would be better to have a 
single thread that calls wakeup every 30 seconds instead.

Or even better, have no thread, and use the event loop:

In init:

    self._next_cleanup = 0

In serve_forever():

    self._next_cleanup = time.time() + CLEANUP_INTERVAL

    while self._is_running:

        timeout = max(self._next_cleanup - time.time(), 0)
        events = self.poller.poll(timeout)
        
        for fd, flag in events:
            handle fd events...
        
        now = time.time()
        if now > self._next_cleanup:
            self._next_cleanup = now + CLEANUP_INTERVAL
            self._cleanup_pending_connections()

I think we have enough threads in vdsm :-)
Line 90:         self.wakeup()
Line 91: 
Line 92:     def _cleanup_pending_connections(self):
Line 93:         for (fd, (accepted, client_socket)) in 
self.pending_connections:


Line 89:         threading.Timer(30, self._trigger_connection_cleanup).start()
Line 90:         self.wakeup()
Line 91: 
Line 92:     def _cleanup_pending_connections(self):
Line 93:         for (fd, (accepted, client_socket)) in 
self.pending_connections:
Iterating over the dictionary returns the keys, and you are trying to get the 
the items. You need:

    for fd, (accepted, client_socket) in self.pending_connections.items():
        ...
Line 94:             if time.time() - accepted > 30.0:
Line 95:                 client_socket.close()
Line 96:                 del self.pending_connections[fd]
Line 97: 


Line 90:         self.wakeup()
Line 91: 
Line 92:     def _cleanup_pending_connections(self):
Line 93:         for (fd, (accepted, client_socket)) in 
self.pending_connections:
Line 94:             if time.time() - accepted > 30.0:
The timeout should be a constant.
Line 95:                 client_socket.close()
Line 96:                 del self.pending_connections[fd]
Line 97: 
Line 98:     def detect_protocol(self, data):


Line 92:     def _cleanup_pending_connections(self):
Line 93:         for (fd, (accepted, client_socket)) in 
self.pending_connections:
Line 94:             if time.time() - accepted > 30.0:
Line 95:                 client_socket.close()
Line 96:                 del self.pending_connections[fd]
What about removing the fd from the poller?

I think that you need methods for adding and removing sockets, because both 
adding an removing must modify both the poller and the pending connections.

Then this will be:

    if time.time() - accepted > DETECTION_TIMEOUT:
        self._remove_connection(client_socket)
        client.socket.close()
Line 97: 
Line 98:     def detect_protocol(self, data):
Line 99:         for handler in self.handlers:
Line 100:             if handler.detect(data):


Line 98:     def detect_protocol(self, data):
Line 99:         for handler in self.handlers:
Line 100:             if handler.detect(data):
Line 101:                 return handler
Line 102:         return None
It would be more clear if you raise here:

        raise CannotDetectProtocol(data)

We do not expect to get unknown protocol here.
Line 103: 
Line 104:     def add_json_binding(self, jsonBinding):
Line 105:         self._jsonBinding = jsonBinding
Line 106:         self._jsonBinding.start()


Line 102:         return None
Line 103: 
Line 104:     def add_json_binding(self, jsonBinding):
Line 105:         self._jsonBinding = jsonBinding
Line 106:         self._jsonBinding.start()
In patch set 17 I asked why you start the bindings, suggesting that somebody 
else will be responsible for this, and you replied "Done". Can you explain why 
you are still doing it?
Line 107: 
Line 108:     def add_xml_binding(self, xmlBinding):
Line 109:         self._xmlBinding = xmlBinding
Line 110:         self._xmlBinding.start()


Line 110:         self._xmlBinding.start()
Line 111: 
Line 112:     def init_handlers(self):
Line 113:         self.handlers = [_StompDetector(self._jsonBinding),
Line 114:                          _XmlDetector(self._xmlBinding)]
Do you have any reason to initialize the handler in a separate method? This way 
you must first call add_json_binding, and add_xml_binding, and then call 
init_handlers. Any other order will be wrong.

It is much robust if you do something like this:

    def add_detector(detector):
        self.handlers.append(detector)

The caller of add_detector will do:

    detector = StompDetector(jsonBindings)
    acceptor.add_detector(detector)

This remove the dependency on the detectors, call order, and bindings.
Line 115: 
Line 116:     def stop(self):
Line 117:         self._isRunning = False
Line 118:         self.wakeup()


Line 136:         client_socket.setblocking(0)
Line 137: 
Line 138:         self.pending_connections[client_socket.fileno()] = 
(time.time(),
Line 139:                                                             
client_socket)
Line 140:         self.poller.register(client_socket, self.READ_ONLY)
Lets extract:

    def _add_connection(self, client_socket):
        client_socket.setblocking(0)
        self.pending_connections[client_socket.fileno()] = (time.time(), 
client_socket)
        self.poller.register(client_socket, self.READ_ONLY)

and:

    def _remove_connection(self, cient_socket):
        self.poller.unregister(client_socket)
        del self.pending_connection[client_socket.fileno()]
        client_socket.setblocking(1)
Line 141: 
Line 142:     def _handle_connection_read(self, fd):
Line 143:         try:
Line 144:             _, client_socket = self.pending_connections[fd]


Line 140:         self.poller.register(client_socket, self.READ_ONLY)
Line 141: 
Line 142:     def _handle_connection_read(self, fd):
Line 143:         try:
Line 144:             _, client_socket = self.pending_connections[fd]
Why is this inside the try block? it cannot raise socket.error. Please keep 
try-blocks minimal.
Line 145:             data = client_socket.recv(4096, socket.MSG_PEEK)
Line 146:         except socket.error as e:
Line 147:             if e.errno not in (errno.EAGAIN, errno.EINPROGRESS):
Line 148:                 self.log.warn("Not able to read data")


Line 141: 
Line 142:     def _handle_connection_read(self, fd):
Line 143:         try:
Line 144:             _, client_socket = self.pending_connections[fd]
Line 145:             data = client_socket.recv(4096, socket.MSG_PEEK)
Why do you copy up to 4096 bytes, when you need only few bytes to detect the 
protocol? Do you have a good reason for this?

This can specially be problematic for the xml detector, which look for /RPC2 
inside the data, and may find some "/RPC2" somewhere near the end of the read 
data.

Fixing the detector to check the path startswith "/RPC2" will eliminate this 
issue, but still, this confuse the reader, about your intention, which seems to 
be, looking at the detectors code, doing a very quick detection by looking at 
the first few bytes of the request.

The longest match use in this module is the xml match, needing 10 bytes:

    POST /RPC2

Of course this class should not know anything about the detectors internals, 
but if we add a simple interface like required_size to the detectors, we can 
read just the bytes we need:

     self._required_size = max(h.required_size for h in self.handlers)

And use this value for peeking:

    data = client_socket.recv(self._required_size, socket.MSG_PEEK)

This makes your intent very clear.
Line 146:         except socket.error as e:
Line 147:             if e.errno not in (errno.EAGAIN, errno.EINPROGRESS):
Line 148:                 self.log.warn("Not able to read data")
Line 149:                 client_socket.close()


Line 144:             _, client_socket = self.pending_connections[fd]
Line 145:             data = client_socket.recv(4096, socket.MSG_PEEK)
Line 146:         except socket.error as e:
Line 147:             if e.errno not in (errno.EAGAIN, errno.EINPROGRESS):
Line 148:                 self.log.warn("Not able to read data")
Please log the error in this case:

    self.log.warning("Not able to read data: %s", e)

Note: log.warn is not documented, and probably does not exists.
Line 149:                 client_socket.close()
Line 150:             return
Line 151: 
Line 152:         handler = self.detect_protocol(data)


Line 145:             data = client_socket.recv(4096, socket.MSG_PEEK)
Line 146:         except socket.error as e:
Line 147:             if e.errno not in (errno.EAGAIN, errno.EINPROGRESS):
Line 148:                 self.log.warn("Not able to read data")
Line 149:                 client_socket.close()
But you did not remove the socket from the pending_connections and poller. 
Please use:

    self._remove_connection(client_socket)
    client_socket.close()
Line 150:             return
Line 151: 
Line 152:         handler = self.detect_protocol(data)
Line 153:         if handler:


Line 149:                 client_socket.close()
Line 150:             return
Line 151: 
Line 152:         handler = self.detect_protocol(data)
Line 153:         if handler:
It would be more clear to handle the error first, as you do above, and continue 
with the normal flow otherwise:

    handler = self.detect_protocol(data)
    if handler is None:
        handle error...

    handle it
Line 154:             self.poller.unregister(client_socket)
Line 155:             del self.pending_connections[fd]
Line 156:             client_socket.setblocking(1)
Line 157:             handler.handleSocket(client_socket,


Line 152:         handler = self.detect_protocol(data)
Line 153:         if handler:
Line 154:             self.poller.unregister(client_socket)
Line 155:             del self.pending_connections[fd]
Line 156:             client_socket.setblocking(1)
Lines 154-156 can be replace with _remove_connection
Line 157:             handler.handleSocket(client_socket,
Line 158:                                  client_socket.getsockname())
Line 159:         else:
Line 160:             self.log.warn("Unrecognized protocol")


Line 157:             handler.handleSocket(client_socket,
Line 158:                                  client_socket.getsockname())
Line 159:         else:
Line 160:             self.log.warn("Unrecognized protocol")
Line 161:             client_socket.close()
Again, you don't remove the socket from the pending dict or the poller.
Line 162: 
Line 163:     def _create_socket(self, host, port):
Line 164:         addr = socket.getaddrinfo(host, port, socket.AF_INET,
Line 165:                                   socket.SOCK_STREAM)


Line 223:         if not self.xmlBinding:
Line 224:             self.log.warn("Xml handler not initialized")
Line 225: 
Line 226:     def detect(self, data):
Line 227:         return data.startswith("POST") and "/RPC2" in data
This should be something like:

    try:
        method, rest = data.split(" ", 1)
    except ValueError:
        return False
    return method == "POST" and rest.startswith("/RPC2")

Otherwise this request will match while it should not:

    POST / HTTP/1.1
    Surprise: /RPC2
    ...

It is unlikely that someone will send this request, but why have a fragile 
detector when it is so easy to have robust one?
Line 228: 
Line 229:     def handleSocket(self, client_socket, socket_address):
Line 230:         if self.xmlBinding:
Line 231:             self.xmlBinding.add_socket(client_socket, socket_address)


-- 
To view, visit http://gerrit.ovirt.org/26300
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Id739a40e2b37dcc175137ec91cd5ec166ad24a75
Gerrit-PatchSet: 20
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <[email protected]>
Gerrit-Reviewer: Barak Azulay <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Piotr Kliczewski <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: Yeela Kaplan <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to