jenkins-bot has submitted this change and it was merged.
Change subject: Switch to pymemcache for finer control and add retries on
network problems.
......................................................................
Switch to pymemcache for finer control and add retries on network problems.
- Add client name and kafka host
Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
---
M handlers/Memcached.py
M kafka-watcher.py
M requirements.txt
M watcher.yaml
4 files changed, 51 insertions(+), 13 deletions(-)
Approvals:
Aaron Schulz: Looks good to me, but someone else must approve
Smalyshev: Looks good to me, approved
Gilles: Looks good to me, but someone else must approve
jenkins-bot: Verified
diff --git a/handlers/Memcached.py b/handlers/Memcached.py
index e6e231a..b972259 100644
--- a/handlers/Memcached.py
+++ b/handlers/Memcached.py
@@ -1,24 +1,46 @@
-import memcache
+from __future__ import print_function
+
+from pymemcache.client.base import Client
+from pymemcache.exceptions import MemcacheUnexpectedCloseError
import time
class Memcached(object):
+ DELAY = 0.5
+ DEBUG = False
- def __init__(self, hostname, **params):
- self.mc = memcache.Client([hostname])
+ def __init__(self, hostname, port, **params):
+ self.mc = Client((hostname, port))
def handle(self, topic, message):
+ """
+ """
+ if 'cmd' not in message:
+ raise Exception("Bad message: no command")
cmd = message['cmd']
if not hasattr(self, cmd):
raise Exception("Unknown command: " + cmd)
- getattr(self, cmd)(message)
+ tryit = True
+ while tryit:
+ tryit = False
+ try:
+ getattr(self, cmd)(message)
+ except MemcacheUnexpectedCloseError:
+ # Server dropped dead - we'll retry
+ tryit = True
+ except IOError:
+ # Something network-related - retry
+ tryit = True
+ if tryit:
+ time.sleep(self.DELAY)
def set(self, message):
text = message['val'].encode('utf-8')
if message.get('sbt', None):
purge_time = time.time() + message.get('uto', 0)
text = text.replace('$UNIXTIME$', '%.6f' % purge_time)
-# print("Set {0}-{1}-{2}".format(message['key'].encode('utf-8'), text,
int(message['ttl'])))
+ if self.DEBUG:
+ print("Set {0}-{1}-{2}".format(message['key'].encode('utf-8'),
text, int(message['ttl'])))
self.mc.set(message['key'].encode('utf-8'), text, int(message['ttl']))
def delete(self, message):
diff --git a/kafka-watcher.py b/kafka-watcher.py
index 0e49860..8b9d9e0 100755
--- a/kafka-watcher.py
+++ b/kafka-watcher.py
@@ -1,10 +1,16 @@
#!/usr/bin/python
+from __future__ import print_function
+
+from kafka import KafkaConsumer
import yaml
import argparse
-from kafka import KafkaConsumer
import imp
import json
import sys
+import socket
+import hashlib
+
+# Kafka client is https://github.com/dpkp/kafka-python
parser = argparse.ArgumentParser(description='Process cache relay commands
from Kafka')
parser.add_argument('--config', required=True, help='YAML configuration file')
@@ -25,20 +31,29 @@
klass = getattr(mod, listener['handler'])
handlers[listener['topic']] = klass(**listener['params'])
-consumer = KafkaConsumer(*topics)
+if 'server' in config:
+ server = config['server']
+else:
+ server = 'localhost'
+
+client_id = "kafka-watcher-%s-%s" % (socket.gethostname(),
hashlib.md5(script_args.config).hexdigest())
+
+print("Connecting to %s as %s" % (server, client_id))
+
+consumer = KafkaConsumer(*topics, bootstrap_servers=server,
client_id=client_id)
for msg in consumer:
if msg.topic not in handlers:
- print("Weird, unknown topic %s" % msg.topic)
+ print("Weird, unknown topic %s" % msg.topic, file=sys.stderr)
continue
try:
data = json.loads(msg.value)
except ValueError:
data = None
if not data:
- print("Could not parse data, meh")
+ print("Could not parse data, meh", file=sys.stderr)
continue
try:
handlers[msg.topic].handle(msg.topic, data)
except:
e = sys.exc_info()
- print("Oops, something happened: " + str(e))
+ print("Oops, something happened: " + str(e), file=sys.stderr)
diff --git a/requirements.txt b/requirements.txt
index 04741d5..c85563b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
yaml
kafka
-memcache
+pymemcache>=1.3.2
diff --git a/watcher.yaml b/watcher.yaml
index d0b5353..490d38a 100644
--- a/watcher.yaml
+++ b/watcher.yaml
@@ -2,5 +2,6 @@
- topic: wancache-purge
handler: Memcached
params:
- hostname: localhost:11211
-
+ hostname: localhost
+ port: 11211
+
--
To view, visit https://gerrit.wikimedia.org/r/284989
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
Gerrit-PatchSet: 8
Gerrit-Project: mediawiki/services/kafka-watcher
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Filippo Giunchedi <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits