Ori.livneh has uploaded a new change for review. https://gerrit.wikimedia.org/r/321230
Change subject: Use systemd's process watchdog to trigger restarts ...................................................................... Use systemd's process watchdog to trigger restarts I3ad8bb49ce was supposed to take care of the problem of statsv getting stuck and not receiving incoming messages, but it didn't. The traceback in T150359 makes it clear that there is a bug in pykafka that makes it handle Kafka restarts poorly. So instead, rely on systemd's process watchdog functionality to make sure statsv is terminated (and restarted) if no messages are received in some span of time, determined by the value of 'WatchdogSec=' in the service unit's file. See https://www.freedesktop.org/software/systemd/man/systemd.service.html#WatchdogSec= and https://www.freedesktop.org/software/systemd/man/sd_notify.html# for more info. Change-Id: Ie471fa762b6f52f6114aec36a89094ddc76eb77c --- M statsv.py 1 file changed, 49 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/statsv refs/changes/30/321230/1 diff --git a/statsv.py b/statsv.py index 1917bfa..969b9ee 100644 --- a/statsv.py +++ b/statsv.py @@ -26,6 +26,7 @@ import json import logging import multiprocessing +import os import re import socket import urlparse @@ -48,6 +49,53 @@ 'kafka1020.eqiad.wmnet:9092', 'kafka1022.eqiad.wmnet:9092', ))) + +SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000) + + +class Watchdog: + """ + Simple notifier for systemd's process watchdog. + + You can use this in message- or request-processing scripts that are + managed by systemd and that are under constant load, where the + absence of work is an abnormal condition. + + Make sure the unit file contains `WatchdogSec=1` (or some other + value) and `Restart=always`. Then you can write something like: + + watchdog = Watchdog() + while 1: + handle_request() + watchdog.notify() + + This way, if the script spends a full second without handling a + request, systemd will restart it. + + See https://www.freedesktop.org/software/systemd/man/systemd.service.html#WatchdogSec= + for more details about systemd's watchdog capabilities. + """ + + def __init__(self): + # Get and clear NOTIFY_SOCKET from the environment to prevent + # subprocesses from inheriting it. + self.addr = os.environ.pop('NOTIFY_SOCKET', None) + if not self.addr: + self.sock = None + return + + # If the first character of NOTIFY_SOCKET is "@", the string is + # understood as an abstract socket address. + if self.addr.startswith('@'): + self.addr = '\0' + self.addr[1:] + + self.sock = socket.socket( + socket.AF_UNIX, socket.SOCK_DGRAM | SOCK_CLOEXEC) + + def notify(self): + if not self.sock: + return + self.sock.sendto(b'WATCHDOG=1', self.addr) def process_queue(q): @@ -90,6 +138,7 @@ for message in consumer: if message is not None: queue.put(message.value) + watchdog.notify() # If we reach this line, TIMEOUT_SECONDS elapsed with no events received. queue.close() -- To view, visit https://gerrit.wikimedia.org/r/321230 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie471fa762b6f52f6114aec36a89094ddc76eb77c Gerrit-PatchSet: 1 Gerrit-Project: analytics/statsv Gerrit-Branch: master Gerrit-Owner: Ori.livneh <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
