jenkins-bot has submitted this change and it was merged.

Change subject: Switch to pymemcache for finer control and add retries on 
network problems.
......................................................................


Switch to pymemcache for finer control and add retries on network problems.

- Add client name and kafka host

Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
---
M handlers/Memcached.py
M kafka-watcher.py
M requirements.txt
M watcher.yaml
4 files changed, 51 insertions(+), 13 deletions(-)

Approvals:
  Aaron Schulz: Looks good to me, but someone else must approve
  Smalyshev: Looks good to me, approved
  Gilles: Looks good to me, but someone else must approve
  jenkins-bot: Verified



diff --git a/handlers/Memcached.py b/handlers/Memcached.py
index e6e231a..b972259 100644
--- a/handlers/Memcached.py
+++ b/handlers/Memcached.py
@@ -1,24 +1,46 @@
-import memcache
+from __future__ import print_function
+
+from pymemcache.client.base import Client
+from pymemcache.exceptions import MemcacheUnexpectedCloseError
 import time
 
 
 class Memcached(object):
+    DELAY = 0.5
+    DEBUG = False
 
-    def __init__(self, hostname, **params):
-        self.mc = memcache.Client([hostname])
+    def __init__(self, hostname, port, **params):
+        self.mc = Client((hostname, port))
 
     def handle(self, topic, message):
+        """
+        """
+        if 'cmd' not in message:
+            raise Exception("Bad message: no command")
         cmd = message['cmd']
         if not hasattr(self, cmd):
             raise Exception("Unknown command: " + cmd)
-        getattr(self, cmd)(message)
+        tryit = True
+        while tryit:
+            tryit = False
+            try:
+                getattr(self, cmd)(message)
+            except MemcacheUnexpectedCloseError:
+                # Server dropped dead - we'll retry
+                tryit = True
+            except IOError:
+                # Something network-related - retry
+                tryit = True
+            if tryit:
+                time.sleep(self.DELAY)
 
     def set(self, message):
         text = message['val'].encode('utf-8')
         if message.get('sbt', None):
             purge_time = time.time() + message.get('uto', 0)
             text = text.replace('$UNIXTIME$', '%.6f' % purge_time)
-#        print("Set {0}-{1}-{2}".format(message['key'].encode('utf-8'), text, 
int(message['ttl'])))
+        if self.DEBUG:
+            print("Set {0}-{1}-{2}".format(message['key'].encode('utf-8'), 
text, int(message['ttl'])))
         self.mc.set(message['key'].encode('utf-8'), text, int(message['ttl']))
 
     def delete(self, message):
diff --git a/kafka-watcher.py b/kafka-watcher.py
index 0e49860..8b9d9e0 100755
--- a/kafka-watcher.py
+++ b/kafka-watcher.py
@@ -1,10 +1,16 @@
 #!/usr/bin/python
+from __future__ import print_function
+
+from kafka import KafkaConsumer
 import yaml
 import argparse
-from kafka import KafkaConsumer
 import imp
 import json
 import sys
+import socket
+import hashlib
+
+# Kafka client is https://github.com/dpkp/kafka-python
 
 parser = argparse.ArgumentParser(description='Process cache relay commands 
from Kafka')
 parser.add_argument('--config', required=True, help='YAML configuration file')
@@ -25,20 +31,29 @@
     klass = getattr(mod, listener['handler'])
     handlers[listener['topic']] = klass(**listener['params'])
 
-consumer = KafkaConsumer(*topics)
+if 'server' in config:
+    server = config['server']
+else:
+    server = 'localhost'
+
+client_id = "kafka-watcher-%s-%s" % (socket.gethostname(), 
hashlib.md5(script_args.config).hexdigest())
+
+print("Connecting to %s as %s" % (server, client_id))
+
+consumer = KafkaConsumer(*topics, bootstrap_servers=server, 
client_id=client_id)
 for msg in consumer:
     if msg.topic not in handlers:
-        print("Weird, unknown topic %s" % msg.topic)
+        print("Weird, unknown topic %s" % msg.topic, file=sys.stderr)
         continue
     try:
         data = json.loads(msg.value)
     except ValueError:
         data = None
     if not data:
-        print("Could not parse data, meh")
+        print("Could not parse data, meh", file=sys.stderr)
         continue
     try:
         handlers[msg.topic].handle(msg.topic, data)
     except:
         e = sys.exc_info()
-        print("Oops, something happened: " + str(e))
+        print("Oops, something happened: " + str(e), file=sys.stderr)
diff --git a/requirements.txt b/requirements.txt
index 04741d5..c85563b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
 yaml
 kafka
-memcache
+pymemcache>=1.3.2
diff --git a/watcher.yaml b/watcher.yaml
index d0b5353..490d38a 100644
--- a/watcher.yaml
+++ b/watcher.yaml
@@ -2,5 +2,6 @@
     - topic: wancache-purge
       handler: Memcached
       params: 
-        hostname: localhost:11211
-    
+        hostname: localhost
+        port: 11211
+

-- 
To view, visit https://gerrit.wikimedia.org/r/284989
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I452c2110599faed7fe7d20e2fb5b5c05f023288b
Gerrit-PatchSet: 8
Gerrit-Project: mediawiki/services/kafka-watcher
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
Gerrit-Reviewer: Aaron Schulz <[email protected]>
Gerrit-Reviewer: Filippo Giunchedi <[email protected]>
Gerrit-Reviewer: Gilles <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to