Yuvipanda has uploaded a new change for review.
https://gerrit.wikimedia.org/r/76764
Change subject: Add separate process that listens for subscription changes
......................................................................
Add separate process that listens for subscription changes
Uses 0mq to communicate between registrar and subscriber. Uses
a common file stored in a known location to communicate on which
host and port the registrar is running. Clients should be able
to do operations on keys only if they already know the key.
FIXME: Autogenerate the key, rather than let the client provide
it. Our bot authors will just provide dummy keys, never secure
ones
Change-Id: I203881dfc15a59cb629d047c33c14f5b60390627
---
A src/registrar.py
M src/subscriptions.py
2 files changed, 85 insertions(+), 30 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/labs/tools/gerrit-to-redis
refs/changes/64/76764/1
diff --git a/src/registrar.py b/src/registrar.py
new file mode 100644
index 0000000..4c833d8
--- /dev/null
+++ b/src/registrar.py
@@ -0,0 +1,71 @@
+import sys
+import os
+import platform
+import logging
+import json
+
+import yaml
+import zmq
+import redis
+
+
+BASE_PATH = os.path.join(os.path.dirname(__file__), '..')
+CONFIG_FILE = os.path.join(BASE_PATH, 'config.yaml')
+with open(CONFIG_FILE) as f:
+ config = yaml.load(f)
+
+REDIS_DB = config['redis']['db']
+REDIS_HOST = config['redis']['host']
+PREFIX = config['stream_receiver']['redis_prefix']
+CLIENTS_KEY = config['stream_receiver']['clients_key']
+
+logging.basicConfig(format='%(asctime)s %(message)s',
filename=os.path.expanduser('~/logs/stream-subscriptions'), level=logging.INFO)
+
+logging.info('Attempting to Redis connection to %s', REDIS_HOST)
+red = redis.StrictRedis(host=REDIS_HOST, db=REDIS_DB)
+logging.info('Redis connection to %s succeded', REDIS_HOST)
+
+def make_key(*key_parts):
+ return PREFIX + "_" + '.'.join(key_parts)
+
+def add_subscription(msg):
+ # Save these explicitly, to avoid them from getting lost
+ red.pipeline().sadd(make_key(CLIENTS_KEY), msg['key']).save().execute()
+ logging.info('Added key %s' % msg['key'])
+ return "Added key"
+
+def delete_subscription(msg):
+ red.pipeline().delete(msg['key']).srem(make_key(CLIENTS_KEY),
msg['key']).save().execute()
+ logging.info('Removed key %s' % msg['key'])
+ return "Removed key"
+
+# Assumes that files created will, by default, not be writable by others
+# Good enough security, eh?
+def write_address_file(port):
+ address = "tcp://%s:%s" % (platform.node(), port)
+ address_path = os.path.join(BASE_PATH, 'registrar')
+
+ address_file = open(address_path, 'w')
+ address_file.write(address)
+ address_file.close()
+
+if __name__ == '__main__':
+ context = zmq.Context()
+ socket = context.socket(zmq.REP)
+ port = socket.bind_to_random_port("tcp://*")
+
+ write_address_file(port)
+
+ while True:
+ try:
+ msg = json.loads(socket.recv())
+ except ValueError:
+ print "Invalid JSON!"
+ continue
+ if msg['action'] == 'add':
+ ret = add_subscription(msg)
+ elif msg['action'] == 'delete':
+ ret = add_subscription(msg)
+ else:
+ ret = "Unrecognized action: %s " % msg.action
+ socket.send(ret)
diff --git a/src/subscriptions.py b/src/subscriptions.py
index 3f8a951..6f56d3b 100755
--- a/src/subscriptions.py
+++ b/src/subscriptions.py
@@ -3,43 +3,30 @@
import os
import logging
import argparse
+import json
-import redis
-import yaml
-
+import zmq
BASE_PATH = os.path.join(os.path.dirname(__file__), '..')
-CONFIG_FILE = os.path.join(BASE_PATH, 'config.yaml')
-with open(CONFIG_FILE) as f:
- config = yaml.load(f)
-REDIS_DB = config['redis']['db']
-REDIS_HOST = config['redis']['host']
-PREFIX = config['stream_receiver']['redis_prefix']
-CLIENTS_KEY = config['stream_receiver']['clients_key']
+context = zmq.Context()
+socket = context.socket(zmq.REQ)
-logging.basicConfig(format='%(asctime)s %(message)s',
filename=os.path.expanduser('~/logs/stream-subscriptions'), level=logging.INFO)
+ADDRESS_PATH = os.path.join(BASE_PATH, 'registrar')
+with open(ADDRESS_PATH) as f:
+ address = f.read()
-logging.info('Attempting to Redis connection to %s', REDIS_HOST)
-red = redis.StrictRedis(host=REDIS_HOST, db=REDIS_DB)
-logging.info('Redis connection to %s succeded', REDIS_HOST)
-
-def make_key(*key_parts):
- return PREFIX + "_" + '.'.join(key_parts)
+socket.connect(address)
def add_subscription(args):
- # Save these explicitly, to avoid them from getting lost
- red.pipeline().sadd(make_key(CLIENTS_KEY), args.key).save().execute()
- logging.info('Added key %s' % args.key)
-
-def list_subscriptions(args):
- subscriptions = red.smembers(make_key(CLIENTS_KEY))
- for subscription in subscriptions:
- print "%s - %s" % (subscription, red.llen(subscription))
+ data = {'action': 'add', 'key': args.key}
+ socket.send(json.dumps(data))
+ print socket.recv()
def delete_subscription(args):
- red.pipeline().delete(args.key).srem(make_key(CLIENTS_KEY),
args.key).save().execute()
- logging.info('Removed key %s' % args.key)
+ date = {'action': 'delete', 'key': args.key}
+ socket.send(json.dumps(data))
+ print socket.recv()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Manage redis gerrit
subscriptions")
@@ -48,9 +35,6 @@
parser_subscribe = subparsers.add_parser('add', help='Add a subscription')
parser_subscribe.add_argument('key')
parser_subscribe.set_defaults(func=add_subscription)
-
- parser_list_subscriptions = subparsers.add_parser('list', help='List all
subscriptions')
- parser_list_subscriptions.set_defaults(func=list_subscriptions)
parser_subscribe = subparsers.add_parser('delete', help='Delete a
subscription')
parser_subscribe.add_argument('key')
--
To view, visit https://gerrit.wikimedia.org/r/76764
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I203881dfc15a59cb629d047c33c14f5b60390627
Gerrit-PatchSet: 1
Gerrit-Project: labs/tools/gerrit-to-redis
Gerrit-Branch: master
Gerrit-Owner: Yuvipanda <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits