BryanDavis has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/381165 )
Change subject: wmcs: update rabbitmq drain_queue script ...................................................................... wmcs: update rabbitmq drain_queue script * Move the script file to an OpenStack version neutral location * Port to Python3 * Update to chunk message downloads so large queues can be drained without crashing rabbitmq * Add optimization for --quiet purges With these changes it should be safe to cron this script to dump the notifications.error queue to a log file once an hour to keep that queue from backing up and also to make it something that we can actually examine for useful information. I'll leave that cron for another patch in part because I'm not sure where to put it in the module/profile hierarchy. Bug: T170492 Change-Id: Ia1a54d1cfe60abc8ce1429dbd2fa8b9530d68050 --- D modules/openstack2/files/liberty/admin_scripts/drain_queue A modules/openstack2/files/util/drain_queue M modules/openstack2/manifests/util/admin_scripts.pp 3 files changed, 121 insertions(+), 99 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet refs/changes/65/381165/1 diff --git a/modules/openstack2/files/liberty/admin_scripts/drain_queue b/modules/openstack2/files/liberty/admin_scripts/drain_queue deleted file mode 100644 index 0c48b7b..0000000 --- a/modules/openstack2/files/liberty/admin_scripts/drain_queue +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/python -# Drain an oslo.message rabbitmq queue -# -# The contents of this file are subject to the Mozilla Public License -# Version 1.1 (the 'License'); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# http://www.mozilla.org/MPL/ -# -# Software distributed under the License is distributed on an 'AS IS' -# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -# License for the specific language governing rights and limitations -# under the License. -# -# The Original Code is RabbitMQ Management Plugin. -# -# The Initial Developer of the Original Code is GoPivotal, Inc. -# Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved. -# -# Modified by Bryan Davis <[email protected]> -# Copyright (c) 2017 Wikimedia Foundation and contributors -from __future__ import print_function - -import argparse -import base64 -import httplib -import json -import socket -import sys -import urlparse - - -def die(s): - print('*** {}'.format(s), file=sys.stderr) - exit(1) - - -def http(verb, path, body=None): - path = '/api%s' % path - conn = httplib.HTTPConnection('localhost', 15672) - headers = { - 'Authorization': 'Basic ' + base64.b64encode('guest:guest'), - } - if body: - headers['Content-Type'] = 'application/json' - try: - conn.request(verb, path, body, headers) - except socket.error as e: - die('Could not connect: {0}'.format(e)) - resp = conn.getresponse() - if resp.status == 400: - die(json.loads(resp.read())['reason']) - if resp.status == 401: - die('Access refused: {0}'.format(path)) - if resp.status == 404: - die('Not found: {0}'.format(path)) - if resp.status == 301: - url = urlparse.urlparse(resp.getheader('location')) - [host, port] = url.netloc.split(':') - return post(url.path + '?' + url.query, body) - if resp.status < 200 or resp.status > 400: - raise Exception('Received %d %s for path %s\n%s' - % (resp.status, resp.reason, path, resp.read())) - return json.loads(resp.read().decode('utf-8')) - -def main(): - parser = argparse.ArgumentParser( - description='Drain an oslo.message rabbitmq queue') - parser.add_argument( - '--dry-run', dest='requeue', action='store_true', - help='return messages to the queue after printing') - parser.add_argument( - '--silent', dest='print', action='store_false', - help='silent mode') - parser.add_argument( - 'queue', metavar='QUEUE', nargs=1, - help='queue to read messages from') - args = parser.parse_args() - - info = http('GET', '/queues/%2F/{}'.format(args.queue[0])) - - res = http( - 'POST', - '/queues/%2F/{}/get'.format(args.queue[0]), - json.dumps({ - 'count': info['messages_ready'], - 'requeue': args.requeue, - 'encoding': 'auto' - }) - ) - - if args.print: - for r in res: - payload = json.loads(r['payload']) - msg = json.loads(payload['oslo.message']) - print(json.dumps(msg)) - -if __name__ == '__main__': - main() diff --git a/modules/openstack2/files/util/drain_queue b/modules/openstack2/files/util/drain_queue new file mode 100644 index 0000000..bd14e4e --- /dev/null +++ b/modules/openstack2/files/util/drain_queue @@ -0,0 +1,120 @@ +#!/usr/bin/python3 -Es +# Drain an oslo.message rabbitmq queue +# +# The contents of this file are subject to the Mozilla Public License +# Version 1.1 (the 'License'); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# http://www.mozilla.org/MPL/ +# +# Software distributed under the License is distributed on an 'AS IS' +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +# License for the specific language governing rights and limitations +# under the License. +# +# The Original Code is RabbitMQ Management Plugin. +# +# The Initial Developer of the Original Code is GoPivotal, Inc. +# Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved. +# +# Modified by Bryan Davis <[email protected]> +# Copyright (c) 2017 Wikimedia Foundation and contributors + +import argparse +import base64 +import http.client +import json +import socket +import sys +import urllib.parse + + +def die(s): + print('*** {}'.format(s), file=sys.stderr) + exit(1) + + +def http_req(verb, path, body=None): + path = '/api%s' % path + conn = http.client.HTTPConnection('localhost', 15672) + headers = { + 'Authorization': 'Basic {}'.format( + base64.b64encode(b'guest:guest').decode('ascii')), + } + if body: + headers['Content-Type'] = 'application/json' + try: + conn.request(verb, path, body, headers) + except socket.error as e: + die('Could not connect: {0}'.format(e)) + resp = conn.getresponse() + resp_body = resp.read().decode('utf-8') + if resp.status == 400: + die(json.loads(resp_body)['reason']) + if resp.status == 401: + die('Access refused: {}'.format(path)) + if resp.status == 404: + die('Not found: {}'.format(path)) + if resp.status == 301: + url = urllib.parse.urlparse(resp.getheader('location')) + [host, port] = url.netloc.split(':') + return http_req(verb, url.path + '?' + url.query, body) + if resp.status < 200 or resp.status > 400: + raise Exception( + 'Received {:d} {} for path {}\n{}'.format( + resp.status, resp.reason, path, resp_body)) + return resp_body + + +def http_json(verb, path, body=None): + return json.loads(http_req(verb, path, body)) + + +def message_count(queue): + return http_json('GET', '/queues/%2F/{}'.format(queue))['messages_ready'] + + +def main(): + parser = argparse.ArgumentParser( + description='Drain an oslo.message rabbitmq queue') + group = parser.add_mutually_exclusive_group() + group.add_argument( + '--dry-run', dest='requeue', action='store_true', + help='return messages to the queue after printing') + group.add_argument( + '--silent', dest='silent', action='store_true', + help='silent mode') + parser.add_argument( + 'queue', metavar='QUEUE', nargs=1, + help='queue to read messages from') + args = parser.parse_args() + queue = args.queue[0] + + if args.silent: + http_req('DELETE', '/queues/%2F/{}/contents'.format(queue)) + + else: + ready = message_count(queue) + while ready > 0: + msgs = http_json( + 'POST', + '/queues/%2F/{}/get'.format(queue), + json.dumps({ + 'count': 1000, # Limit response size + 'requeue': args.requeue, + 'encoding': 'auto' + }) + ) + + for m in msgs: + payload = json.loads(m['payload']) + msg = json.loads(payload['oslo.message']) + print(json.dumps(msg)) + + if args.requeue: + ready = 0 # prevent infinte loops + else: + ready = message_count(queue) + + +if __name__ == '__main__': + main() diff --git a/modules/openstack2/manifests/util/admin_scripts.pp b/modules/openstack2/manifests/util/admin_scripts.pp index c51cb5d..feac5e5 100644 --- a/modules/openstack2/manifests/util/admin_scripts.pp +++ b/modules/openstack2/manifests/util/admin_scripts.pp @@ -14,7 +14,7 @@ owner => 'root', group => 'root', mode => '0655', - source => "puppet:///modules/openstack2/${version}/admin_scripts/drain_queue", + source => "puppet:///modules/openstack2/util/drain_queue", } # Script to cold-migrate instances between compute nodes -- To view, visit https://gerrit.wikimedia.org/r/381165 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia1a54d1cfe60abc8ce1429dbd2fa8b9530d68050 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: BryanDavis <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
