jenkins-bot has submitted this change and it was merged. Change subject: Implement multiversion/refreshWikiversionsCDB in python ......................................................................
Implement multiversion/refreshWikiversionsCDB in python Adds a pure python version of the multiversion/refreshWikiversionsCDB PHP script. * Add wmf_realm and datacenter configuration settings * Add a python implementation of MWRealm's getRealmSpecificFilename * Import cdblib.py from https://python-pure-cdb.googlecode.com/ * Add tasks.compile_wikiversions_cdb as a refreshWikiversionsCDB workalike * Change tasks.scap to use compile_wikiversions_cdb instead of shelling out to PHP script Change-Id: Ie50be09e724b276a009037a4ee2fd059d1fd87dc --- M docs/api.rst M scap.cfg A scap/cdblib.py M scap/config.py M scap/tasks.py M scap/utils.py 6 files changed, 363 insertions(+), 2 deletions(-) Approvals: Ori.livneh: Looks good to me, approved jenkins-bot: Verified diff --git a/docs/api.rst b/docs/api.rst index 921e048..9d12e10 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -15,3 +15,10 @@ .. automodule:: scap.tasks .. automodule:: scap.utils + +Third party +=========== + +cdblib +------ +.. automodule:: scap.cdblib diff --git a/scap.cfg b/scap.cfg index b57dff6..55acbc8 100644 --- a/scap.cfg +++ b/scap.cfg @@ -32,12 +32,31 @@ # Statsd server port statsd_port: 2003 +# Deployment realm +wmf_realm: production + +# Deployment datacenter +datacenter: pmtpa + + +[eqiad.wmnet] +# Wikimedia Foundation production eqiad datacenter +datacenter: eqiad + + [wmnet] # Wikimedia Foundation production cluster configuration master_rsync: tin.eqiad.wmnet statsd_host: statsd.eqiad.wmnet + +[eqiad.wmflabs] +# Wikimedia Foundation beta eqiad datacenter +datacenter: eqiad + + [wmflabs] # Wikimedia Foundation beta cluster configuration master_rsync: deployment-bastion.pmtpa.wmflabs statsd_host: deployment-bastion.pmtpa.wmflabs +wmf_realm: labs diff --git a/scap/cdblib.py b/scap/cdblib.py new file mode 100644 index 0000000..bc64058 --- /dev/null +++ b/scap/cdblib.py @@ -0,0 +1,235 @@ +# -*- coding: utf-8 -*- +''' +| Imported from: `python-pure-cdb <https://python-pure-cdb.googlecode.com/>`_ +| Author: David Wilson +| License: MIT + +Manipulate DJB's Constant Databases. These are 2 level disk-based hash tables +that efficiently handle many keys, while remaining space-efficient. + + http://cr.yp.to/cdb.html + +When generated databases are only used with Python code, consider using hash() +rather than djb_hash() for a tidy speedup. + +.. note:: + Minor alterations made to comply with PEP8 style check and to remove + attempt to import C implementation of djb_hash. -- bd808, 2014-03-04 +''' + +from _struct import Struct +from itertools import chain + + +def py_djb_hash(s): + '''Return the value of DJB's hash function for the given 8-bit string.''' + h = 5381 + for c in s: + h = (((h << 5) + h) ^ ord(c)) & 0xffffffff + return h + +# 2014-03-04 bd808: removed try block for importing C hash implementation +djb_hash = py_djb_hash + +read_2_le4 = Struct('<LL').unpack +write_2_le4 = Struct('<LL').pack + + +class Reader(object): + '''A dictionary-like object for reading a Constant Database accessed + through a string or string-like sequence, such as mmap.mmap().''' + + def __init__(self, data, hashfn=djb_hash): + '''Create an instance reading from a sequence and using hashfn to hash + keys.''' + if len(data) < 2048: + raise IOError('CDB too small') + + self.data = data + self.hashfn = hashfn + + self.index = [read_2_le4(data[i:i + 8]) for i in xrange(0, 2048, 8)] + self.table_start = min(p[0] for p in self.index) + # Assume load load factor is 0.5 like official CDB. + self.length = sum(p[1] >> 1 for p in self.index) + + def iteritems(self): + '''Like dict.iteritems(). Items are returned in insertion order.''' + pos = 2048 + while pos < self.table_start: + klen, dlen = read_2_le4(self.data[pos:pos + 8]) + pos += 8 + + key = self.data[pos:pos + klen] + pos += klen + + data = self.data[pos:pos + dlen] + pos += dlen + + yield key, data + + def items(self): + '''Like dict.items().''' + return list(self.iteritems()) + + def iterkeys(self): + '''Like dict.iterkeys().''' + return (p[0] for p in self.iteritems()) + __iter__ = iterkeys + + def itervalues(self): + '''Like dict.itervalues().''' + return (p[1] for p in self.iteritems()) + + def keys(self): + '''Like dict.keys().''' + return [p[0] for p in self.iteritems()] + + def values(self): + '''Like dict.values().''' + return [p[1] for p in self.iteritems()] + + def __getitem__(self, key): + '''Like dict.__getitem__().''' + value = self.get(key) + if value is None: + raise KeyError(key) + return value + + def has_key(self, key): + '''Return True if key exists in the database.''' + return self.get(key) is not None + __contains__ = has_key + + def __len__(self): + '''Return the number of records in the database.''' + return self.length + + def gets(self, key): + '''Yield values for key in insertion order.''' + # Truncate to 32 bits and remove sign. + h = self.hashfn(key) & 0xffffffff + start, nslots = self.index[h & 0xff] + + if nslots: + end = start + (nslots << 3) + slot_off = start + (((h >> 8) % nslots) << 3) + + for pos in chain(xrange(slot_off, end, 8), + xrange(start, slot_off, 8)): + rec_h, rec_pos = read_2_le4(self.data[pos:pos + 8]) + + if not rec_h: + break + elif rec_h == h: + klen, dlen = read_2_le4(self.data[rec_pos:rec_pos + 8]) + rec_pos += 8 + + if self.data[rec_pos:rec_pos + klen] == key: + rec_pos += klen + yield self.data[rec_pos:rec_pos + dlen] + + def get(self, key, default=None): + '''Get the first value for key, returning default if missing.''' + # Avoid exception catch when handling default case; much faster. + return chain(self.gets(key), (default,)).next() + + def getint(self, key, default=None, base=0): + '''Get the first value for key converted it to an int, returning + default if missing.''' + value = self.get(key, default) + if value is not default: + return int(value, base) + return value + + def getints(self, key, base=0): + '''Yield values for key in insertion order after converting to int.''' + return (int(v, base) for v in self.gets(key)) + + def getstring(self, key, default=None, encoding='utf-8'): + '''Get the first value for key decoded as unicode, returning default if + not found.''' + value = self.get(key, default) + if value is not default: + return value.decode(encoding) + return value + + def getstrings(self, key, encoding='utf-8'): + '''Yield values for key in insertion order after decoding as + unicode.''' + return (v.decode(encoding) for v in self.gets(key)) + + +class Writer(object): + '''Object for building new Constant Databases, and writing them to a + seekable file-like object.''' + + def __init__(self, fp, hashfn=djb_hash): + '''Create an instance writing to a file-like object, using hashfn to + hash keys.''' + self.fp = fp + self.hashfn = hashfn + + fp.write('\x00' * 2048) + self._unordered = [[] for i in xrange(256)] + + def put(self, key, value=''): + '''Write a string key/value pair to the output file.''' + assert type(key) is str and type(value) is str + + pos = self.fp.tell() + self.fp.write(write_2_le4(len(key), len(value))) + self.fp.write(key) + self.fp.write(value) + + h = self.hashfn(key) & 0xffffffff + self._unordered[h & 0xff].append((h, pos)) + + def puts(self, key, values): + '''Write more than one value for the same key to the output file. + Equivalent to calling put() in a loop.''' + for value in values: + self.put(key, value) + + def putint(self, key, value): + '''Write an integer as a base-10 string associated with the given key + to the output file.''' + self.put(key, str(value)) + + def putints(self, key, values): + '''Write zero or more integers for the same key to the output file. + Equivalent to calling putint() in a loop.''' + self.puts(key, (str(value) for value in values)) + + def putstring(self, key, value, encoding='utf-8'): + '''Write a unicode string associated with the given key to the output + file after encoding it as UTF-8 or the given encoding.''' + self.put(key, unicode.encode(value, encoding)) + + def putstrings(self, key, values, encoding='utf-8'): + '''Write zero or more unicode strings to the output file. Equivalent to + calling putstring() in a loop.''' + self.puts(key, (unicode.encode(value, encoding) for value in values)) + + def finalize(self): + '''Write the final hash tables to the output file, and write out its + index. The output file remains open upon return.''' + index = [] + for tbl in self._unordered: + length = len(tbl) << 1 + ordered = [(0, 0)] * length + for pair in tbl: + where = (pair[0] >> 8) % length + for i in chain(xrange(where, length), xrange(0, where)): + if not ordered[i][0]: + ordered[i] = pair + break + + index.append((self.fp.tell(), length)) + for pair in ordered: + self.fp.write(write_2_le4(*pair)) + + self.fp.seek(0) + for pair in index: + self.fp.write(write_2_le4(*pair)) + self.fp = None # prevent double finalize() diff --git a/scap/config.py b/scap/config.py index 28cabab..764a15e 100644 --- a/scap/config.py +++ b/scap/config.py @@ -16,6 +16,8 @@ 'master_rsync': 'localhost', 'statsd_host': '127.0.0.1', 'statsd_port': 2003, + 'wmf_realm': 'production', + 'datacenter': 'pmtpa', } diff --git a/scap/tasks.py b/scap/tasks.py index 4e5c36b..cb7f652 100644 --- a/scap/tasks.py +++ b/scap/tasks.py @@ -5,12 +5,15 @@ Contains functions implementing scap tasks """ +import errno +import json import logging import os import random import socket import subprocess +from . import cdblib from . import log from . import ssh from . import utils @@ -34,6 +37,67 @@ """Run lint.php on `paths`; raise CalledProcessError if nonzero exit.""" cmd = '/usr/bin/php -n -dextension=parsekit.so /usr/local/bin/lint.php' return subprocess.check_call(cmd.split() + list(paths)) + + +def compile_wikiversions_cdb(cfg): + """Validate and compile the wikiversions.json file into a CDB database. + + 1. Find the realm specific filename for wikiversions.json in staging area + 2. Validate that all versions mentioned in the json exist as directories + in the staging area + 3. Validate that all wikis listed in the realm specific all.dblist exist + in the json + 4. Create a temporary CDB file from the json contents + 5. Atomically rename the temporary CDB to the realm specific + wikiversions.cdb filename + + :param cfg: Global configuration + """ + + # Find the realm specific wikiverisons file names + base_file = os.path.join(cfg['stage_dir'], 'wikiversions.json') + json_file = utils.get_realm_specific_filename( + base_file, cfg['wmf_realm'], cfg['datacenter']) + cdb_file = '%s.cdb' % os.path.splitext(json_file)[0] + + with open(json_file) as f: + wikiversions = json.load(f) + + # Validate that all versions in the json file exist locally + for dbname, version in wikiversions.items(): + version_dir = os.path.join(cfg['stage_dir'], version) + if not os.path.isdir(version_dir): + raise IOError(errno.ENOENT, 'Invalid version dir', version_dir) + + # Get the list of all wikis + all_dblist_file = utils.get_realm_specific_filename( + os.path.join(cfg['stage_dir'], 'all.dblist'), + cfg['wmf_realm'], cfg['datacenter']) + all_dbs = set(line.strip() for line in open(all_dblist_file)) + + # Validate that all wikis are in the json file + missing_dbs = [db for db in wikiversions.keys() if db not in all_dbs] + if missing_dbs: + raise KeyError('Missing %d expected dbs in %f: %s' % ( + len(missing_dbs), json_file, ', '.join(missing_dbs))) + + tmp_cdb_file = '%s.tmp' % cdb_file + if os.path.exists(tmp_cdb_file): + os.unlink(tmp_cdb_file) + + # Write temp cdb file + with open(tmp_cdb_file, 'wb') as fp: + writer = cdblib.Writer(fp) + for dbname, version in wikiversions.items(): + writer.put(str('ver:%s' % dbname), str(version)) + writer.finalize() + os.fsync(fp.fileno()) + + if not os.path.isfile(tmp_cdb_file): + raise IOError(errno.ENOENT, 'Failed to create CDB', tmp_cdb_file) + + os.rename(tmp_cdb_file, cdb_file) + os.chmod(cdb_file, 0664) def sync_common(cfg, sync_from=None): @@ -124,8 +188,7 @@ t.mark('scap-rebuild-cdbs') with log.Timer('syncing wikiversions.cdb', stats) as t: - subprocess.check_call('%(stage_dir)s/multiversion/' - 'refreshWikiversionsCDB' % cfg) + compile_wikiversions_cdb(cfg) ssh.cluster_monitor(mw_install_hosts, 'sudo -u mwdeploy /usr/bin/rsync -l ' '%(master_rsync)s::common/wikiversions.{json,cdb} ' diff --git a/scap/utils.py b/scap/utils.py index 0cca274..c8ef961 100644 --- a/scap/utils.py +++ b/scap/utils.py @@ -116,3 +116,38 @@ return host finally: s.close() + + +def get_realm_specific_filename(filename, realm, datacenter): + """Find the most specific file for the given realm and datacenter. + + The extension is separated from the filename and then recombined with the + realm and datacenter: + - base-realm-datacenter.ext + - base-realm.ext + - base-datacenter.ext + """ + base, ext = os.path.splitext(filename) + + if ext == '': + return filename + + parts = { + 'base': base, + 'realm': realm, + 'datacenter': datacenter, + 'ext': ext, + } + + possible = ( + '%(base)s-%(realm)s-%(datacenter)%(ext)s', + '%(base)s-%(realm)s%(ext)s', + '%(base)s-%(datacenter)%(ext)s', + ) + + for new_filename in (p % parts for p in possible): + if os.path.isfile(new_filename): + return new_filename + + # If all else fails, return the original filename + return filename -- To view, visit https://gerrit.wikimedia.org/r/116906 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie50be09e724b276a009037a4ee2fd059d1fd87dc Gerrit-PatchSet: 4 Gerrit-Project: mediawiki/tools/scap Gerrit-Branch: master Gerrit-Owner: BryanDavis <bda...@wikimedia.org> Gerrit-Reviewer: Chad <ch...@wikimedia.org> Gerrit-Reviewer: Hashar <has...@free.fr> Gerrit-Reviewer: Ori.livneh <o...@wikimedia.org> Gerrit-Reviewer: Reedy <re...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits