Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/321230

Change subject: Use systemd's process watchdog to trigger restarts
......................................................................

Use systemd's process watchdog to trigger restarts

I3ad8bb49ce was supposed to take care of the problem of statsv getting
stuck and not receiving incoming messages, but it didn't. The traceback
in T150359 makes it clear that there is a bug in pykafka that makes it
handle Kafka restarts poorly.

So instead, rely on systemd's process watchdog functionality to make
sure statsv is terminated (and restarted) if no messages are received in
some span of time, determined by the value of 'WatchdogSec=' in the
service unit's file.

See
https://www.freedesktop.org/software/systemd/man/systemd.service.html#WatchdogSec=
and
https://www.freedesktop.org/software/systemd/man/sd_notify.html#
for more info.

Change-Id: Ie471fa762b6f52f6114aec36a89094ddc76eb77c
---
M statsv.py
1 file changed, 49 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/statsv 
refs/changes/30/321230/1

diff --git a/statsv.py b/statsv.py
index 1917bfa..969b9ee 100644
--- a/statsv.py
+++ b/statsv.py
@@ -26,6 +26,7 @@
 import json
 import logging
 import multiprocessing
+import os
 import re
 import socket
 import urlparse
@@ -48,6 +49,53 @@
     'kafka1020.eqiad.wmnet:9092',
     'kafka1022.eqiad.wmnet:9092',
 )))
+
+SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0x80000)
+
+
+class Watchdog:
+    """
+    Simple notifier for systemd's process watchdog.
+
+    You can use this in message- or request-processing scripts that are
+    managed by systemd and that are under constant load, where the
+    absence of work is an abnormal condition.
+
+    Make sure the unit file contains `WatchdogSec=1` (or some other
+    value) and `Restart=always`. Then you can write something like:
+
+        watchdog = Watchdog()
+        while 1:
+            handle_request()
+            watchdog.notify()
+
+    This way, if the script spends a full second without handling a
+    request, systemd will restart it.
+
+    See 
https://www.freedesktop.org/software/systemd/man/systemd.service.html#WatchdogSec=
+    for more details about systemd's watchdog capabilities.
+    """
+
+    def __init__(self):
+        # Get and clear NOTIFY_SOCKET from the environment to prevent
+        # subprocesses from inheriting it.
+        self.addr = os.environ.pop('NOTIFY_SOCKET', None)
+        if not self.addr:
+            self.sock = None
+            return
+
+        # If the first character of NOTIFY_SOCKET is "@", the string is
+        # understood as an abstract socket address.
+        if self.addr.startswith('@'):
+            self.addr = '\0' + self.addr[1:]
+
+        self.sock = socket.socket(
+            socket.AF_UNIX, socket.SOCK_DGRAM | SOCK_CLOEXEC)
+
+    def notify(self):
+        if not self.sock:
+            return
+        self.sock.sendto(b'WATCHDOG=1', self.addr)
 
 
 def process_queue(q):
@@ -90,6 +138,7 @@
 for message in consumer:
     if message is not None:
         queue.put(message.value)
+        watchdog.notify()
 
 # If we reach this line, TIMEOUT_SECONDS elapsed with no events received.
 queue.close()

-- 
To view, visit https://gerrit.wikimedia.org/r/321230
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie471fa762b6f52f6114aec36a89094ddc76eb77c
Gerrit-PatchSet: 1
Gerrit-Project: analytics/statsv
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to