Yurik has uploaded a new change for review. https://gerrit.wikimedia.org/r/154230
Change subject: Refactored sms & web parsing to common base class ...................................................................... Refactored sms & web parsing to common base class Change-Id: Id294fabdae2b67b5d1b366455b83606a05513465 --- M scripts/api.py D scripts/download.py A scripts/logprocessor.py R scripts/smsgraphs.py A scripts/smslogs.py M scripts/weblogs.py 6 files changed, 620 insertions(+), 667 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/zero-sms refs/changes/30/154230/1 diff --git a/scripts/api.py b/scripts/api.py index d980ab9..62ad555 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -11,6 +11,16 @@ import urlparse +class AttrDict(dict): + """ + Taken from http://stackoverflow.com/questions/4984647/accessing-dict-keys-like-an-attribute-in-python/25320214 + But it seems we should at some point switch to https://pypi.python.org/pypi/attrdict + """ + def __init__(self, *args, **kwargs): + super(AttrDict, self).__init__(*args, **kwargs) + self.__dict__ = self + + class ConsoleLog(object): """ Basic console logger. Most frameworks would probably want to implement their own. @@ -117,7 +127,7 @@ request_kw['params'] = kwargs result = self.request(method, forceSSL=forceSSL, **request_kw) - data = result.json() + data = result.json(object_hook=AttrDict) # Handle success and failure if 'error' in data: diff --git a/scripts/download.py b/scripts/download.py deleted file mode 100644 index 6b09ef0..0000000 --- a/scripts/download.py +++ /dev/null @@ -1,332 +0,0 @@ -# coding=utf-8 -import pipes -import string -import subprocess -import locale -from datetime import datetime, timedelta -import re -import os -import json -import traceback - -from boto.s3.connection import S3Connection -import io -import itertools -import generate - - -def generatePassword(size=10, chars=string.ascii_letters + string.digits): - """ Adapted from - http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python - """ - import random - return ''.join(random.choice(chars) for _ in range(size)) - - -class Processor(object): - dateFormat = '%Y-%m-%d' - - def __init__(self, dataDir='data', workDir='state', graphDir='graphs', settingsFile='settings.json'): - self.dataDir = dataDir - self.workDir = workDir - self.graphDir = graphDir - if not os.path.exists(dataDir): os.mkdir(dataDir) - if not os.path.exists(workDir): os.mkdir(workDir) - if not os.path.exists(graphDir): os.mkdir(graphDir) - - self.settingsFilePath = os.path.join(self.workDir, settingsFile) - self.combinedFilePath = os.path.join(self.workDir, 'combined.tsv') - self.statsFilePath = os.path.join(self.workDir, 'combined.json') - - data = self.loadState() - self.awsBucket = data['awsBucket'] if 'awsBucket' in data else generatePassword() - self.awsKeyId = data['awsKeyId'] if 'awsKeyId' in data else generatePassword() - self.awsSecret = data['awsSecret'] if 'awsSecret' in data else generatePassword() - self.awsUser = data['awsUser'] if 'awsUser' in data else generatePassword() - self.downloadOverlapDays = data['downloadOverlapDays'] if 'downloadOverlapDays' in data else False - self.enableDownload = data['enableDownload'] if 'enableDownload' in data else True - self.enableDownloadOld = data['enableDownloadOld'] if 'enableDownloadOld' in data else True - self.lastDownloadTs = self.parseDate(data['lastDownloadTs']) if 'lastDownloadTs' in data else False - self.lastErrorMsg = data['lastErrorMsg'] if 'lastErrorMsg' in data else False - self.lastErrorTs = self.parseDate(data['lastErrorTs']) if 'lastErrorTs' in data else False - self.lastGoodRunTs = self.parseDate(data['lastGoodRunTs']) if 'lastGoodRunTs' in data else False - self.lastProcessedTs = self.parseDate(data['lastProcessedTs']) if 'lastProcessedTs' in data else False - self.partnerDirMap = data['partnerDirMap'] if 'partnerDirMap' in data else {} - self.partnerMap = data['partnerMap'] if 'partnerMap' in data else {} - self.processOverlapDays = data['processOverlapDays'] if 'processOverlapDays' in data else 1 - self.salt = data['salt'] if 'salt' in data else generatePassword() - self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False - self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False - self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False - self.sortCmd = data['sortCmd'] if 'sortCmd' in data else 'sort' - - if self.downloadOverlapDays and self.lastDownloadTs: - self.downloadIfAfter = self.lastDownloadTs - timedelta(days=self.downloadOverlapDays) - else: - self.downloadIfAfter = False - - if self.lastProcessedTs: - self.processIfAfter = self.lastProcessedTs - timedelta(days=self.processOverlapDays) - else: - self.processIfAfter = False - - # wikipedia_application_3.log.2014-06-11 - self.fileRe = re.compile(ur'^wikipedia_application_\d+\.log\.(?P<date>\d{4}-\d{2}-\d{2})$', re.IGNORECASE) - - def loadState(self): - if os.path.isfile(self.settingsFilePath): - with io.open(self.settingsFilePath, 'rb') as f: - return json.load(f) - return {} - - def saveState(self): - data = self.loadState() - data['awsBucket'] = self.awsBucket - data['awsKeyId'] = self.awsKeyId - data['awsSecret'] = self.awsSecret - data['awsUser'] = self.awsUser - data['downloadOverlapDays'] = int(self.downloadOverlapDays) if self.downloadOverlapDays else False - data['enableDownload'] = self.enableDownload - data['enableDownloadOld'] = self.enableDownloadOld - data['lastDownloadTs'] = self.formatDate(self.lastDownloadTs) - data['lastErrorMsg'] = self.lastErrorMsg - data['lastErrorTs'] = self.formatDate(self.lastErrorTs) - data['lastGoodRunTs'] = self.formatDate(self.lastGoodRunTs) - data['lastProcessedTs'] = self.formatDate(self.lastProcessedTs) - data['partnerDirMap'] = self.partnerDirMap - data['partnerMap'] = self.partnerMap - data['processOverlapDays'] = int(self.processOverlapDays) if self.processOverlapDays else False - data['salt'] = self.salt - data['smtpFrom'] = self.smtpFrom - data['smtpHost'] = self.smtpHost - data['smtpTo'] = self.smtpTo - data['sortCmd'] = self.sortCmd - - stateBk = self.settingsFilePath + '.bak' - with open(stateBk, 'wb') as f: - json.dump(data, f, indent=True, sort_keys=True) - if os.path.exists(self.settingsFilePath): - os.remove(self.settingsFilePath) - os.rename(stateBk, self.settingsFilePath) - - def parseDate(self, value): - return datetime.strptime(str(value), self.dateFormat) if isinstance(value, basestring) else value - - def formatDate(self, value): - return value.strftime(self.dateFormat) if isinstance(value, datetime) else value - - def getFileDate(self, filename): - m = self.fileRe.match(filename) - return self.parseDate(m.group('date')) if m else False - - def download(self): - print('\nDownloading files') - - cn = S3Connection(self.awsKeyId, self.awsSecret) - prefix = 'prd-vumi-wikipedia.aws.prk-host.net/' - bucket = cn.get_bucket(self.awsBucket) - files = bucket.list(prefix) - - for key in files: - filename = key.key[len(prefix):] - filePath = os.path.join(self.dataDir, filename) - fileDate = self.getFileDate(filename) - fileExists = os.path.exists(filePath) - - if not self.enableDownloadOld and not fileDate: - print('Skipping legacy-named file %s' % filename) - continue - elif key.size == 0: - print('Skipping empty file %s' % filename) - continue - elif not fileExists: - reason = u"it doesn't exist" - elif key.size != os.stat(filePath).st_size: - reason = u'local size %s <> remote %s' % ( - locale.format(u"%d", os.stat(filePath).st_size, grouping=True), - locale.format(u"%d", key.size, grouping=True)) - elif fileDate and self.downloadIfAfter and fileDate > self.downloadIfAfter: - reason = u'date is too close to last file date %s' % self.downloadIfAfter - else: - continue - - print('Downloading %s because %s' % (filename, reason)) - if fileExists: - if os.stat(filePath).st_size == 0: - print('Removing empty file %s' % filePath) - os.remove(filePath) - else: - bakCount = 0 - bakFile = filePath + '.bak' - while os.path.exists(bakFile): - bakCount += 1 - bakFile = filePath + '.bak' + str(bakCount) - print('Renaming %s => %s' % (filePath, bakFile)) - os.rename(filePath, bakFile) - - key.get_contents_to_filename(filePath) - if fileDate and (not self.lastDownloadTs or self.lastDownloadTs < fileDate): - self.lastDownloadTs = fileDate - - def combineDataFiles(self, sourceFiles): - - print('Combining files into %s' % self.combinedFilePath) - print('Processing %s' % (('files on or after %s' % self.processIfAfter) if self.processIfAfter else 'all files')) - - appendingDataFile = self.combinedFilePath + '.tmp' - manualLogRe = re.compile(ur'^wikipedia_application_\d+\.log\.\d+\.gz\:') - - totalCount = 0 - with io.open(appendingDataFile, 'w', encoding='utf8') as dst: - for srcFile in sourceFiles: - - fileDate = self.getFileDate(srcFile) - if self.processIfAfter: - if not fileDate: - continue # old style filename, and the processIfAfter is set - elif fileDate <= self.processIfAfter: - continue # we have already processed this file - - srcFilePath = os.path.join(self.dataDir, srcFile) - if not os.path.isfile(srcFilePath): - print('File %s was not found, skipping' % srcFilePath) - continue - last = False - count = 0 - for line in io.open(srcFilePath, 'r', encoding='utf8'): - count += 1 - totalCount += 1 - if count == 1 or totalCount % 30000 == 0: - print('File %s, line %d, total lines %d' % (srcFile, count-1, totalCount-1)) - - l = line.strip(u'\n\r') - l = manualLogRe.sub( '', l, 1 ) - if u' WIKI\t' in l: - self.writeLine(dst, last) - last = l - elif len(l) > 2 and l[0] == u'2' and l[1] == u'0': - self.writeLine(dst, last) - last = False - elif isinstance(last, basestring): - last = last + '\t' + l - - self.writeLine(dst, last) - if fileDate and (not self.lastProcessedTs or self.lastProcessedTs < fileDate): - self.lastProcessedTs = fileDate - - if totalCount > 0: - # Sort files into one - sortedOutputFile = self.combinedFilePath + '.out' - if os.path.exists(sortedOutputFile): os.remove(sortedOutputFile) - - args = [self.sortCmd, '-u', '-o', sortedOutputFile, appendingDataFile] - originalExists = os.path.exists(self.combinedFilePath) - if originalExists: - args.append(self.combinedFilePath) - cmd = ' '.join([pipes.quote(v) for v in args]) - print('\nSorting: %s' % cmd) - try: - tmp2 = sortedOutputFile + '2' - if os.path.exists(tmp2): - os.remove(tmp2) - - subprocess.check_output(args, stderr=subprocess.STDOUT) - - # Extra safety - keep old file until we rename temp to its name - if originalExists: - os.rename(self.combinedFilePath, tmp2) - os.rename(sortedOutputFile, self.combinedFilePath) - if originalExists: - os.remove(tmp2) - - except subprocess.CalledProcessError, ex: - raise Exception(u'Error %s running %s\nOutput:\n%s' % (ex.returncode, cmd, ex.output)) - - os.remove(appendingDataFile) - - def writeLine(self, dst, line): - if not line: - return - line = line.replace(u'\0', u'\\0') - parts = line.split('\t') - if parts[1][0] == u'+': - return - parts = [p[2:-1] - if (p.startswith(u"u'") and p.endswith(u"'")) or (p.startswith(u'u"') and p.endswith(u'"')) - else p for p in parts] - tmp = parts[0] - parts[0] = parts[1] - parts[1] = tmp\ - .replace(u' [VumiRedis,client]', u'') \ - .replace(u' [HTTP11ClientProtocol,client]', u'') \ - .replace(u' WIKI', u'') \ - .replace(u'+0000', u'') - - if len(parts) > 5 and parts[5].startswith(u'content='): - parts[5] = u'content=' + str(len(parts[5]) - 10) - - if len(parts) > 6: - parts[6] = parts[6].replace('\0', '\\0') - - dst.write('\t'.join(parts) + '\n') - - def error(self, error): - self.lastErrorTs = datetime.now() - self.lastErrorMsg = error - - print(error) - - if not self.smtpHost or not self.smtpFrom or not self.smtpTo: - return - - import smtplib - from email.mime.text import MIMEText - from email.mime.multipart import MIMEMultipart - - msg = MIMEMultipart('alternative') - msg['Subject'] = 'SMS report error' - msg.attach(MIMEText(error, 'plain', 'utf-8')) - - # m = MIMEText(error, 'plain', 'utf-8') - # m['From'] = self.smtpFrom - # m['To'] = self.smtpTo - # m['Subject'] = msg['Subject'] - - s = smtplib.SMTP(self.smtpHost) - s.sendmail(self.smtpFrom, self.smtpTo, msg.as_string().encode('ascii')) - s.quit() - - def generateGraphData(self, skipParsing=False): - stats = generate.Stats(self.combinedFilePath, self.graphDir, self.statsFilePath, self.partnerMap, self.partnerDirMap, self.salt) - - if not skipParsing: - print('\nParsing data') - stats.process() - stats.pickle() - else: - print('Loading parsed data') - stats.unpickle() - - print('Generating data files to %s' % self.graphDir) - # stats.dumpStats() - stats.createGraphs() - - def run(self): - # noinspection PyBroadException - try: - if self.enableDownload: - self.download() - files = os.listdir(self.dataDir) - files = itertools.chain([os.path.join('pc', f) for f in os.listdir(os.path.join(self.dataDir, 'pc'))], files) - self.combineDataFiles(files) - - self.generateGraphData() - self.lastGoodRunTs = datetime.now() - except: - self.error(traceback.format_exc()) - self.saveState() - - -if __name__ == "__main__": - prc = Processor() - prc.run() diff --git a/scripts/logprocessor.py b/scripts/logprocessor.py new file mode 100644 index 0000000..a11a18c --- /dev/null +++ b/scripts/logprocessor.py @@ -0,0 +1,196 @@ +from datetime import datetime +import io +from itertools import chain, imap +import json +import os +import traceback +from unidecode import unidecode +from api import AttrDict + +validSites = { + u'wikipedia', + u'wikimedia', + u'wiktionary', + u'wikisource', + u'wikibooks', + u'wikiquote', + u'mediawiki', + u'wikimediafoundation', + u'wikiversity', + u'wikinews', + u'wikivoyage', +} + + +def safePrint(text): + print(unidecode(unicode(text))) + + +def joinValues(vals, separator=u'\t', colCount=0): + if 0 < colCount != len(vals): + raise ValueError(u'Cannot save value that should have %d columns, not %d\n%s' % + (colCount, len(vals), joinValues(vals, u','))) + return unicode(separator).join([unicode(v) for v in vals]) + + +def writeData(filename, data, columns, separator=u'\t'): + colCount = len(columns) + with io.open(filename, 'w', encoding='utf8', errors='ignore') as out: + out.writelines( + chain( + [joinValues(columns, separator) + '\n'], # header + imap(lambda vals: joinValues(vals, separator, colCount) + '\n', data))) + + +def readData(filename, colCount=0, separator=u'\t'): + """ + :type filename str|unicode + :type colCount int|list + :type separator str|unidecode: + :return: + """ + if type(colCount) is list: + colCount = len(colCount) + isFirst = colCount > 0 + if not isFirst: + colCount = -colCount + with io.open(filename, 'r', encoding='utf8', errors='ignore') as inp: + for line in inp: + vals = line.strip(u'\r\n').split(separator) + if 0 < colCount != len(vals): + raise ValueError('This value should have %d columns, not %d: %s in file %s' % + (colCount, len(vals), joinValues(vals, u','), filename)) + if isFirst: + isFirst = False + continue + yield vals + + +def update(a, b): + for key in b: + if key in a: + if isinstance(a[key], dict) and isinstance(b[key], dict): + update(a[key], b[key]) + else: + a[key] = b[key] + return a + + +class LogProcessor(object): + def __init__(self, settingsFile, pathSuffix): + + self.dateFormat = '%Y-%m-%d' + self.dateTimeFormat = '%Y-%m-%d %H:%M:%S' + + self.settingsFile = self.normalizePath(settingsFile, False) + + settings = self.defaultSettings(pathSuffix) + if os.path.isfile(self.settingsFile): + with io.open(self.settingsFile, 'rb') as f: + settings = update(json.load(f, object_hook=AttrDict), settings) + self.settings = settings + self.onSettingsLoaded() + + if not self.settings.pathLogs or not self.settings.pathCache or not self.settings.pathGraphs: + raise ValueError('One of the paths is not set, check %s' % settingsFile) + + self.pathLogs = self.normalizePath(self.settings.pathLogs) + self.pathCache = self.normalizePath(self.settings.pathCache) + self.pathGraphs = self.normalizePath(self.settings.pathGraphs) + + def saveSettings(self): + self.onSavingSettings() + try: + filename = self.settingsFile + backup = filename + '.bak' + with open(backup, 'wb') as f: + json.dump(self.settings, f, indent=True, sort_keys=True) + if os.path.exists(filename): + os.remove(filename) + os.rename(backup, filename) + finally: + self.onSettingsLoaded() + + def normalizePath(self, path, relToSettings=True): + if not os.path.isabs(path) and relToSettings: + path = os.path.join(os.path.dirname(self.settingsFile), path) + path = os.path.abspath(os.path.normpath(path)) + dirPath = path if relToSettings else os.path.dirname(path) + if not os.path.exists(dirPath): + os.makedirs(dirPath) + return path + + def error(self, error): + self.settings.lastErrorTs = datetime.now() + self.settings.lastErrorMsg = error + + safePrint(error) + + if not self.settings.smtpHost or not self.settings.smtpFrom or not self.settings.smtpTo: + return + + import smtplib + from email.mime.text import MIMEText + from email.mime.multipart import MIMEMultipart + + msg = MIMEMultipart('alternative') + msg['Subject'] = 'SMS report error' + msg.attach(MIMEText(error, 'plain', 'utf-8')) + + # m = MIMEText(error, 'plain', 'utf-8') + # m['From'] = self.smtpFrom + # m['To'] = self.smtpTo + # m['Subject'] = msg['Subject'] + + smtp = smtplib.SMTP(self.settings.smtpHost) + smtp.sendmail(self.settings.smtpFrom, self.settings.smtpTo, msg.as_string().encode('ascii')) + smtp.quit() + + def defaultSettings(self, suffix): + if suffix: + suffix = suffix.strip('/\\') + suffix = os.sep + suffix if suffix else '' + + s = AttrDict() + + s.lastErrorMsg = '' + s.lastErrorTs = False + s.lastGoodRunTs = False + + s.smtpFrom = False + s.smtpHost = False + s.smtpTo = False + + s.pathLogs = 'logs' + suffix + s.pathCache = 'cache' + suffix + s.pathGraphs = 'graphs' + suffix + + return s + + def onSavingSettings(self): + s = self.settings + s.lastErrorTs = self.formatDate(s.lastErrorTs, self.dateTimeFormat) + s.lastGoodRunTs = self.formatDate(s.lastGoodRunTs, self.dateTimeFormat) + + def onSettingsLoaded(self): + pass + + # noinspection PyMethodMayBeStatic + def formatDate(self, value, dateFormat): + return value.strftime(dateFormat) if isinstance(value, datetime) else value + + # noinspection PyMethodMayBeStatic + def parseDate(self, value, dateFormat): + return datetime.strptime(str(value), dateFormat) if isinstance(value, basestring) else value + + def safeRun(self): + # noinspection PyBroadException + try: + self.run() + self.settings.lastGoodRunTs = datetime.now() + except: + self.error(traceback.format_exc()) + self.saveSettings() + + def run(self): + pass diff --git a/scripts/generate.py b/scripts/smsgraphs.py similarity index 82% rename from scripts/generate.py rename to scripts/smsgraphs.py index 4c585a2..093fcc9 100644 --- a/scripts/generate.py +++ b/scripts/smsgraphs.py @@ -17,12 +17,12 @@ # G. Of the number above ('F'), how many of those opted to receive more information # H. Average number of 'reply for more' requests during a single session (from the set of users in 'D') # I. Average number of SMS's sent (from the set of users in 'D') -#J. Total number of SMS's sent (from the set of users in 'D') +# J. Total number of SMS's sent (from the set of users in 'D') # -#Hourly totals for 24-hour daily period +# Hourly totals for 24-hour daily period # -#A. Number of sessions initiated -#B. Number of sessions initiated and sent an SMS response of any non-zero number of SMS's +# A. Number of sessions initiated +# B. Number of sessions initiated and sent an SMS response of any non-zero number of SMS's import re import unicodedata @@ -55,8 +55,8 @@ # State machine: # # start -> titles -> section -> ussdcontent -> smscontent(+) -> more-no-content -# v v -# section-invalid content-invalid +# v v +# section-invalid content-invalid # # NOTES: # Sometimes smscontent appears in the logs before ussdcontent @@ -118,8 +118,8 @@ def values(self): return self.__dict__.values() - def __cmp__(self, dict): - return cmp(self.__dict__, dict) + def __cmp__(self, d): + return cmp(self.__dict__, d) def __contains__(self, item): return item in self.__dict__ @@ -169,8 +169,87 @@ return 'sum' not in self.__dict__ +def splitKey(key): + isError = key.startswith(u'err-') + if isError: + key = key[len(u'err-'):] + + if key.endswith(u'_unique'): + key = key[0:-len(u'_unique')] + tp = u'unique' + elif key.endswith(u'_usrmonth'): + key = key[0:-len(u'_usrmonth')] + tp = u'usrmonth' + elif key == u'newuser': + tp = key + else: + tp = u'total' + + return isError, key, tp + + +def filterData(data, isError=False, isNewUser=False, isUnique=False, knownState=True, includeStats=False, + yieldTuple=False): + for key, dates in data.items(): + isErr, state, typ = splitKey(key) + if isErr != isError: + continue + if (typ == u'unique') != isUnique: + continue + if (typ == u'newuser') != isNewUser: + continue + if (state in stateNames) != knownState: + continue + state = stateNames[state] + for dateStr, e in dates.items(): + if not dateStr.startswith(u'daily_'): + continue + ts = dateStr[len(u'daily_'):] + if yieldTuple: + yield (ts, state, e.count) if not includeStats else (ts, state, e.count, e.min, e.avg, e.max) + else: + res = { + u'date': ts, + u'state': state, + u'count': e.count, + } + if includeStats: + res[u'avg'] = e.avg + res[u'min'] = e.min + res[u'max'] = e.max + yield res + + +def createStatesGraph(partnerDir, data, states): + d = sorted(filterData(data, yieldTuple=True), + key=lambda v: + v[0] + v[1]) + # v[u'date'] + v[u'state']) + # groups = groupby(d, key=itemgetter('')) + # [{'type':k, 'items':[x[0] for x in v]} for k, v in groups] + # from itertools import groupby, islice + # from operator import itemgetter + # from collections import defaultdict + + # probably splitting this up in multiple lines would be more readable + pivot = ( + ( + ts, + defaultdict(lambda: '', (islice(d, 1, None) for d in dd)) + ) + for ts, dd in groupby(d, itemgetter(0))) + + resultFile = os.path.join(partnerDir, 'states-count-per-day.tsv') + with io.open(resultFile, 'w', encoding='utf8') as f: + f.write(u'date\t' + u'\t'.join(states) + u'\n') + for ts, counts in pivot: + f.write(ts + u'\t' + u'\t'.join(str(counts[s]) for s in states) + u'\n') + + class Stats(object): def __init__(self, sourceFile, graphDir, stateFile, partnerMap=None, partnerDirMap=None, salt=''): + self.newUserUnique = set() + self.unique = defaultdict(dict) self.sourceFile = sourceFile self.graphDir = graphDir self.stateFile = stateFile @@ -221,7 +300,6 @@ self.newUserUnique.add(userId) self._addStats(partner, u'newuser', key2) - def _addStatsUnique(self, partner, stage, key2, userId): u = self.unique[stage] if key2 not in u: @@ -231,12 +309,11 @@ else: return self._addStats(partner, stage, key2) - # if not isError: # key2 = u'hourly_' + ts.strftime(u'%Y-%m-%d %H') + u':00' # self._addStats(partner, key, key2, value) - def addStats(self, partner, stage, ts, userId, value=-1, isError=False): + def addStats(self, partner, stage, ts, userId, value=-1): # self._addStats(partner, key, u'_totals', value) # self._addStatsUnique(partner, stage + u'_unique', u'_totals', id) @@ -262,8 +339,6 @@ self.addStats(entry.partner, k, ts, userId, v) def process(self): - self.unique = defaultdict(dict) - self.newUserUnique = set() cId = 0 cTime = 1 @@ -321,10 +396,9 @@ parts[cPartner] = str(secondsFromStart) del parts[cId] fErr.write(u'\t'.join(parts) + u'\n') - self.addStats(entry.partner, u'err--bad-transitions', timestamp, entry.id, secondsFromStart, - isError=True) + self.addStats(entry.partner, u'err--bad-transitions', timestamp, entry.id, secondsFromStart) key = (u'err-cont-' if isError else u'err-new-') + transition[0] + u'-' + transition[1] - self.addStats(entry.partner, key, timestamp, entry.id, secondsFromStart, isError=True) + self.addStats(entry.partner, key, timestamp, entry.id, secondsFromStart) lastParts = False isError = True elif not isNew: @@ -444,7 +518,8 @@ sanitizedPartner = unicode(re.sub('[^\w\s-]', '', sanitizedPartner).strip().lower()) sanitizedPartner = re.sub('[-\s]+', '-', sanitizedPartner) infoFile = os.path.join(dataDir, sanitizedPartner) - if not os.path.exists(infoFile): open(infoFile, 'a').close() + if not os.path.exists(infoFile): + open(infoFile, 'a').close() # Create dashboard dashboardFile = os.path.join(dashboard, partnerKey + '.json') @@ -457,7 +532,8 @@ { "name": "Graphs", "graph_ids": [ - "http://gp.wmflabs.org/data/datafiles/gp_zero_local/" + partnerKey + "/states-count-per-day.tsv" + "http://gp.wmflabs.org/data/datafiles/gp_zero_local/%s/states-count-per-day.tsv" + % partnerKey ] } ] @@ -472,88 +548,14 @@ states = sorted([stateNames[v] for v in goodStates]) for partner, data in self.stats.items(): - if partner in self.partnerDirMap: - partnerKey = self.partnerDirMap[partner] - else: + if partner not in self.partnerDirMap: import hashlib + partnerKey = hashlib.sha224(partner + self.salt).hexdigest() self.partnerDirMap[partner] = partnerKey partnerDir = self.makePartnerDir(partner) - self.createStatesGraph(partnerDir, data, states) - - def createStatesGraph(self, partnerDir, data, states): - d = sorted(self.filterData(data, yieldTuple=True), - key=lambda v: - v[0] + v[1]) - # v[u'date'] + v[u'state']) - # groups = groupby(d, key=itemgetter('')) - # [{'type':k, 'items':[x[0] for x in v]} for k, v in groups] - # from itertools import groupby, islice - # from operator import itemgetter - # from collections import defaultdict - - # probably splitting this up in multiple lines would be more readable - pivot = ( - (ts, - defaultdict(lambda: '', (islice(d, 1, None) for d in dd)) - ) - for ts, dd in groupby(d, itemgetter(0))) - - resultFile = os.path.join(partnerDir, 'states-count-per-day.tsv') - with io.open(resultFile, 'w', encoding='utf8') as f: - f.write(u'date\t' + u'\t'.join(states) + u'\n') - for ts, counts in pivot: - f.write(ts + u'\t' + u'\t'.join(str(counts[s]) for s in states)+u'\n') - - - def filterData(self, data, isError=False, isNewUser=False, isUnique=False, knownState=True, includeStats=False, yieldTuple=False): - for key, dates in data.items(): - isErr, state, type = self.splitKey(key) - if isErr != isError: - continue - if (type == u'unique') != isUnique: - continue - if (type == u'newuser') != isNewUser: - continue - if (state in stateNames) != knownState: - continue - state = stateNames[state] - for dateStr, e in dates.items(): - if not dateStr.startswith(u'daily_'): - continue - ts = dateStr[len(u'daily_'):] - if yieldTuple: - yield (ts, state, e.count) if not includeStats else (ts, state, e.count, e.min, e.avg, e.max) - else: - res = { - u'date': ts, - u'state': state, - u'count': e.count, - } - if includeStats: - res[u'avg'] = e.avg - res[u'min'] = e.min - res[u'max'] = e.max - yield res - - def splitKey(self, key): - isError = key.startswith(u'err-') - if isError: - key = key[len(u'err-'):] - - if key.endswith(u'_unique'): - key = key[0:-len(u'_unique')] - tp = u'unique' - elif key.endswith(u'_usrmonth'): - key = key[0:-len(u'_usrmonth')] - tp = u'usrmonth' - elif key == u'newuser': - tp = key - else: - tp = u'total' - - return isError, key, tp + createStatesGraph(partnerDir, data, states) if __name__ == '__main__': diff --git a/scripts/smslogs.py b/scripts/smslogs.py new file mode 100644 index 0000000..e01d85a --- /dev/null +++ b/scripts/smslogs.py @@ -0,0 +1,265 @@ +# coding=utf-8 +import pipes +import string +import subprocess +import locale +from datetime import timedelta +import re +import traceback +import itertools + +from boto.s3.connection import S3Connection + +import smsgraphs +from logprocessor import * + + +def generatePassword(size=10, chars=string.ascii_letters + string.digits): + """ Adapted from + http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python + """ + import random + + return ''.join(random.choice(chars) for _ in range(size)) + + +class SmsLogProcessor(LogProcessor): + dateFormat = '%Y-%m-%d' + + def __init__(self, settingsFile='settings/smslogs.json'): + super(SmsLogProcessor, self).__init__(settingsFile, 'web') + + self.combinedFilePath = os.path.join(self.pathCache, 'combined.tsv') + self.statsFilePath = os.path.join(self.pathCache, 'combined.json') + + if self.settings.downloadOverlapDays and self.settings.lastDownloadTs: + self.downloadIfAfter = self.settings.lastDownloadTs - timedelta(days=self.settings.downloadOverlapDays) + else: + self.downloadIfAfter = False + + if self.settings.lastProcessedTs: + self.processIfAfter = self.settings.lastProcessedTs - timedelta(days=self.settings.processOverlapDays) + else: + self.processIfAfter = False + + # wikipedia_application_3.log.2014-06-11 + self.fileRe = re.compile(r'^wikipedia_application_\d+\.log\.(?P<date>\d{4}-\d{2}-\d{2})$', re.IGNORECASE) + + def defaultSettings(self, suffix): + s = super(SmsLogProcessor, self).defaultSettings(suffix) + s.awsBucket = generatePassword() + s.awsKeyId = generatePassword() + s.awsSecret = generatePassword() + s.awsPrefix = '' + s.awsUser = generatePassword() + s.downloadOverlapDays = 0 + s.enableDownload = True + s.enableDownloadOld = True + s.lastDownloadTs = False + s.lastProcessedTs = False + s.partnerDirMap = {} + s.partnerMap = {} + s.processOverlapDays = 1 + s.salt = generatePassword() + s.sortCmd = 'sort' + return s + + def onSavingSettings(self): + super(SmsLogProcessor, self).onSavingSettings() + s = self.settings + s.lastDownloadTs = self.formatDate(s.lastDownloadTs, self.dateFormat) + s.lastProcessedTs = self.formatDate(s.lastProcessedTs, self.dateFormat) + + def onSettingsLoaded(self): + super(SmsLogProcessor, self).onSettingsLoaded() + s = self.settings + s.lastDownloadTs = self.parseDate(s.lastDownloadTs, self.dateFormat) + s.lastProcessedTs = self.parseDate(s.lastProcessedTs, self.dateFormat) + + def getFileDate(self, filename): + m = self.fileRe.match(filename) + return self.parseDate(m.group('date'), self.dateFormat) if m else False + + def download(self): + safePrint(u'\nDownloading files') + + cn = S3Connection(self.settings.awsKeyId, self.settings.awsSecret) + + bucket = cn.get_bucket(self.settings.awsBucket) + files = bucket.list(self.settings.awsPrefix) + + for key in files: + filename = key.key[len(self.settings.awsPrefix):] + filePath = os.path.join(self.pathLogs, filename) + fileDate = self.getFileDate(filename) + fileExists = os.path.exists(filePath) + + if key.size == 0: + safePrint(u'Skipping empty file %s' % filename) + continue + elif not fileExists: + reason = u"it doesn't exist" + elif key.size != os.stat(filePath).st_size: + reason = u'local size %s <> remote %s' % ( + locale.format(u"%d", os.stat(filePath).st_size, grouping=True), + locale.format(u"%d", key.size, grouping=True)) + elif fileDate and self.downloadIfAfter and fileDate > self.downloadIfAfter: + reason = u'date is too close to last file date %s' % self.downloadIfAfter + else: + continue + + if not self.settings.enableDownloadOld and not fileDate: + safePrint(u'Skipping legacy-named file %s even though %s' % (filename, reason)) + continue + + safePrint(u'Downloading %s because %s' % (filename, reason)) + if fileExists: + if os.stat(filePath).st_size == 0: + safePrint(u'Removing empty file %s' % filePath) + os.remove(filePath) + else: + bakCount = 0 + bakFile = filePath + '.bak' + while os.path.exists(bakFile): + bakCount += 1 + bakFile = filePath + '.bak' + str(bakCount) + safePrint(u'Renaming %s => %s' % (filePath, bakFile)) + os.rename(filePath, bakFile) + + key.get_contents_to_filename(filePath) + if fileDate and (not self.settings.lastDownloadTs or self.settings.lastDownloadTs < fileDate): + self.settings.lastDownloadTs = fileDate + + def combineDataFiles(self, sourceFiles): + + safePrint(u'Combining files into %s' % self.combinedFilePath) + if self.processIfAfter: + safePrint(u'Processing files on or after %s' % self.processIfAfter) + else: + safePrint(u'Processing all files') + + appendingDataFile = self.combinedFilePath + '.tmp' + manualLogRe = re.compile(r'^wikipedia_application_\d+\.log\.\d+\.gz:') + + totalCount = 0 + with io.open(appendingDataFile, 'w', encoding='utf8') as dst: + for srcFile in sourceFiles: + + fileDate = self.getFileDate(srcFile) + if self.processIfAfter: + if not fileDate: + continue # old style filename, and the processIfAfter is set + elif fileDate <= self.processIfAfter: + continue # we have already processed this file + + srcFilePath = os.path.join(self.pathLogs, srcFile) + if not os.path.isfile(srcFilePath): + safePrint(u'File %s was not found, skipping' % srcFilePath) + continue + last = False + count = 0 + for line in io.open(srcFilePath, 'r', encoding='utf8'): + count += 1 + totalCount += 1 + if count == 1 or totalCount % 30000 == 0: + safePrint(u'File %s, line %d, total lines %d' % (srcFile, count - 1, totalCount - 1)) + + l = line.strip(u'\n\r') + l = manualLogRe.sub('', l, 1) + if u' WIKI\t' in l: + self.writeLine(dst, last) + last = l + elif len(l) > 2 and l[0] == u'2' and l[1] == u'0': + self.writeLine(dst, last) + last = False + elif isinstance(last, basestring): + last = last + '\t' + l + + self.writeLine(dst, last) + if fileDate and (not self.settings.lastProcessedTs or self.settings.lastProcessedTs < fileDate): + self.settings.lastProcessedTs = fileDate + + if totalCount > 0: + # Sort files into one + sortedOutputFile = self.combinedFilePath + '.out' + if os.path.exists(sortedOutputFile): + os.remove(sortedOutputFile) + + args = [self.settings.sortCmd, '-u', '-o', sortedOutputFile, appendingDataFile] + originalExists = os.path.exists(self.combinedFilePath) + if originalExists: + args.append(self.combinedFilePath) + cmd = ' '.join([pipes.quote(v) for v in args]) + safePrint(u'\nSorting: %s' % cmd) + try: + tmp2 = sortedOutputFile + '2' + if os.path.exists(tmp2): + os.remove(tmp2) + + subprocess.check_output(args, stderr=subprocess.STDOUT) + + # Extra safety - keep old file until we rename temp to its name + if originalExists: + os.rename(self.combinedFilePath, tmp2) + os.rename(sortedOutputFile, self.combinedFilePath) + if originalExists: + os.remove(tmp2) + + except subprocess.CalledProcessError, ex: + raise Exception(u'Error %s running %s\nOutput:\n%s' % (ex.returncode, cmd, ex.output)) + + os.remove(appendingDataFile) + + def writeLine(self, dst, line): + if not line: + return + line = line.replace(u'\0', u'\\0') + parts = line.split('\t') + if parts[1][0] == u'+': + return + parts = [p[2:-1] + if (p.startswith(u"u'") and p.endswith(u"'")) or (p.startswith(u'u"') and p.endswith(u'"')) + else p for p in parts] + tmp = parts[0] + parts[0] = parts[1] + parts[1] = tmp \ + .replace(u' [VumiRedis,client]', u'') \ + .replace(u' [HTTP11ClientProtocol,client]', u'') \ + .replace(u' WIKI', u'') \ + .replace(u'+0000', u'') + + if len(parts) > 5 and parts[5].startswith(u'content='): + parts[5] = u'content=' + str(len(parts[5]) - 10) + + if len(parts) > 6: + parts[6] = parts[6].replace(u'\0', u'\\0') + + dst.write(u'\t'.join(parts) + u'\n') + + def generateGraphData(self, skipParsing=False): + stats = smsgraphs.Stats(self.combinedFilePath, self.pathGraphs, self.statsFilePath, self.settings.partnerMap, + self.settings.partnerDirMap, self.settings.salt) + if not skipParsing: + safePrint(u'\nParsing data') + stats.process() + stats.pickle() + else: + safePrint(u'Loading parsed data') + stats.unpickle() + + safePrint(u'Generating data files to %s' % self.pathGraphs) + # stats.dumpStats() + stats.createGraphs() + + def run(self): + if self.settings.enableDownload: + self.download() + files = os.listdir(self.pathLogs) + files = itertools.chain([os.path.join('pc', f) for f in os.listdir(os.path.join(self.pathLogs, 'pc'))], + files) + self.combineDataFiles(files) + self.generateGraphData() + + +if __name__ == "__main__": + SmsLogProcessor().safeRun() diff --git a/scripts/weblogs.py b/scripts/weblogs.py index 891b85c..0b8ebd5 100644 --- a/scripts/weblogs.py +++ b/scripts/weblogs.py @@ -1,85 +1,11 @@ # coding=utf-8 import gzip -from datetime import datetime -import os -import json -import traceback import re -import io import collections import sys -from itertools import imap, ifilter, chain +from itertools import ifilter -try: - from unidecode import unidecode -except ImportError: - unidecode = lambda txt: txt.encode('ascii', 'replace') - - -def safePrint(text): - print(unidecode(unicode(text))) - - -def saveJson(filename, data): - with open(filename, 'wb') as f: - json.dump(data, f, indent=True, sort_keys=True) - - -def loadJson(filename): - with io.open(filename, 'rb') as f: - return json.load(f) - - -def joinValues(vals, separator=u'\t', colCount=0): - if 0 < colCount != len(vals): - raise ValueError(u'Cannot save value that should have %d columns, not %d\n%s' % - (colCount, len(vals), joinValues(vals, u','))) - return unicode(separator).join([unicode(v) for v in vals]) - - -def writeData(filename, data, columns, separator=u'\t'): - colCount = len(columns) - with io.open(filename, 'w', encoding='utf8', errors='ignore') as out: - out.writelines( - chain( - [joinValues(columns, separator) + '\n'], # header - imap(lambda vals: joinValues(vals, separator, colCount) + '\n', data))) - - -def readData(filename, colCount=0, separator=u'\t'): - """ - - :param filename: - :type colCount int|list - :param separator: - :return: - """ - if type(colCount) is list: - colCount = len(colCount) - isFirst = colCount > 0 - if not isFirst: - colCount = -colCount - with io.open(filename, 'r', encoding='utf8', errors='ignore') as inp: - for line in inp: - vals = line.strip(u'\r\n').split(separator) - if 0 < colCount != len(vals): - raise ValueError('This value should have %d columns, not %d: %s in file %s' % - (colCount, len(vals), joinValues(vals, u','), filename)) - if isFirst: - isFirst = False - continue - yield vals - - -def _sanitizeValue(values): - if values[1] == 'ERR': - values[8] = '' - return values - - -def sanitizeToCsv(inpFile, outFile, columns): - writeData(outFile, - imap(_sanitizeValue, readData(inpFile, columns)), columns, u',') +from logprocessor import * def addStat(stats, date, dataType, xcs, via, ipset, https, lang, subdomain, site): @@ -95,47 +21,11 @@ columnHeaders11 = u'date,type,xcs,via,ipset,https,lang,subdomain,site,zero,count'.split(',') validSubDomains = {'m', 'zero', 'mobile', 'wap'} validHttpCode = {'200', '304'} -validSites = { - u'wikipedia', - u'wikimedia', - u'wiktionary', - u'wikisource', - u'wikibooks', - u'wikiquote', - u'mediawiki', - u'wikimediafoundation', - u'wikiversity', - u'wikinews', - u'wikivoyage', -} -class LogProcessor(object): +class WebLogProcessor(LogProcessor): def __init__(self, settingsFile='settings/weblogs.json', logDatePattern=False): - - self.settingsFile = self.normalizePath(settingsFile, False) - - data = self.loadState() - self.lastErrorMsg = data['lastErrorMsg'] if 'lastErrorMsg' in data else False - self.lastErrorTs = data['lastErrorTs'] if 'lastErrorTs' in data else False - self.lastGoodRunTs = data['lastGoodRunTs'] if 'lastGoodRunTs' in data else False - self.lastProcessedTs = data['lastProcessedTs'] if 'lastProcessedTs' in data else False - self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False - self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False - self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False - self.username = data['apiUsername'] if 'apiUsername' in data else '' - self.password = data['apiPassword'] if 'apiPassword' in data else '' - self.rawPathLogs = data['pathLogs'] if 'pathLogs' in data else '' - self.rawPathStats = data['pathStats'] if 'pathStats' in data else '' - self.rawPathGraphs = data['pathGraphs'] if 'pathGraphs' in data else '' - self.saveState() - - if not self.rawPathLogs or not self.rawPathStats or not self.rawPathGraphs: - raise ValueError('One of the paths is not set, check %s' % settingsFile) - - self.pathLogs = self.normalizePath(self.rawPathLogs) - self.pathStats = self.normalizePath(self.rawPathStats) - self.pathGraphs = self.normalizePath(self.rawPathGraphs) + super(WebLogProcessor, self).__init__(settingsFile, 'web') # zero.tsv.log-20140808.gz if not logDatePattern: @@ -147,52 +37,30 @@ self.duplUrlRe = re.compile(r'^(https?://.+)\1', re.IGNORECASE) self.zcmdRe = re.compile(r'zcmd=([-a-z0-9]+)', re.IGNORECASE) - def saveState(self): - fmt = lambda v: v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else v - - data = self.loadState() - data['lastErrorMsg'] = self.lastErrorMsg - data['lastErrorTs'] = fmt(self.lastErrorTs) - data['lastGoodRunTs'] = fmt(self.lastGoodRunTs) - data['lastProcessedTs'] = fmt(self.lastProcessedTs) - data['smtpFrom'] = self.smtpFrom - data['smtpHost'] = self.smtpHost - data['smtpTo'] = self.smtpTo - data['apiUsername'] = self.username - data['apiPassword'] = self.password - data['pathLogs'] = self.rawPathLogs - data['pathStats'] = self.rawPathStats - data['pathGraphs'] = self.rawPathGraphs - - stateBk = self.settingsFile + '.bak' - saveJson(stateBk, data) - if os.path.exists(self.settingsFile): - os.remove(self.settingsFile) - os.rename(stateBk, self.settingsFile) - - def loadState(self): - if os.path.isfile(self.settingsFile): - return loadJson(self.settingsFile) - return {} + def defaultSettings(self, suffix): + s = super(WebLogProcessor, self).defaultSettings(suffix) + s.apiUsername = '' + s.apiPassword = '' + return s def downloadConfigs(self): import api site = api.wikimedia('zero', 'wikimedia', 'https') - site.login(self.username, self.password) + site.login(self.settings.apiUsername, self.settings.apiPassword) # https://zero.wikimedia.org/w/api.php?action=zeroportal&type=analyticsconfig&format=jsonfm - configs = site('zeroportal', type='analyticsconfig')['zeroportal'] + configs = site('zeroportal', type='analyticsconfig').zeroportal for cfs in configs.values(): for c in cfs: c['from'] = datetime.strptime(c['from'], '%Y-%m-%dT%H:%M:%SZ') - if c['before'] is None: - c['before'] = datetime.max + if c.before is None: + c.before = datetime.max else: - c['before'] = datetime.strptime(c['before'], '%Y-%m-%dT%H:%M:%SZ') - c['languages'] = True if True == c['languages'] else set(c['languages']) - c['sites'] = True if True == c['sites'] else set(c['sites']) - c['via'] = set(c['via']) - c['ipsets'] = set(c['ipsets']) + c.before = datetime.strptime(c.before, '%Y-%m-%dT%H:%M:%SZ') + c.languages = True if True == c.languages else set(c.languages) + c.sites = True if True == c.sites else set(c.sites) + c.via = set(c.via) + c.ipsets = set(c.ipsets) return configs def processLogFiles(self): @@ -205,7 +73,7 @@ continue logFile = os.path.join(self.pathLogs, f) logSize = os.stat(logFile).st_size - statFile = os.path.join(self.pathStats, f + '__' + unicode(logSize) + '.tsv') + statFile = os.path.join(self.pathCache, f + '__' + unicode(logSize) + '.tsv') statFiles[f] = statFile if not os.path.exists(statFile): fileDt = m.group(1) @@ -214,12 +82,12 @@ # Clean up older stat files (if gz file size has changed) removeFiles = [] - for f in os.listdir(self.pathStats): + for f in os.listdir(self.pathCache): m = self.statFileRe.match(f) if not m: continue logFile = m.group(1) - statFile = os.path.join(self.pathStats, f) + statFile = os.path.join(self.pathCache, f) if logFile not in statFiles or statFiles[logFile] == statFile: continue # The log file has been deleted or its the latest removeFiles.append(statFile) @@ -272,7 +140,7 @@ addStat(stats, fileDt, 'ERR', '000-00', 'ERR', 'ERR', False, '', 'analytics', '') continue verb = l[7] - analytics = dict([x.split('=', 2) for x in set(analytics.split(';'))]) + analytics = AttrDict([x.split('=', 2) for x in set(analytics.split(';'))]) if 'zero' in analytics: xcs = analytics['zero'].rstrip('|') else: @@ -350,10 +218,10 @@ safePrint('Combine stat files') configs = self.downloadConfigs() stats = collections.defaultdict(int) - for f in os.listdir(self.pathStats): + for f in os.listdir(self.pathCache): if not self.statFileRe.match(f): continue - for vals in readData(os.path.join(self.pathStats, f), columnHeaders10): + for vals in readData(os.path.join(self.pathCache, f), columnHeaders10): # "0 1 2 3 4 5 6 7 8 9" # "2014-07-25 DATA 250-99 DIRECT default http ru zero wikipedia 1000" if len(vals) != 10: @@ -377,14 +245,14 @@ site2 = subdomain + '.' + site isZero = False for conf in configs[xcs]: - langs = conf['languages'] - sites = conf['sites'] - if conf['from'] <= dt < conf['before'] and \ - (conf['https'] or https == u'http') and \ + langs = conf.languages + sites = conf.sites + if conf['from'] <= dt < conf.before and \ + (conf.https or https == u'http') and \ (True == langs or lang in langs) and \ (True == sites or site2 in sites) and \ - (via in conf['via']) and \ - (ipset in conf['ipsets']): + (via in conf.via) and \ + (ipset in conf.ipsets): isZero = True break vals[9] = u'INCL' if isZero else u'EXCL' @@ -403,95 +271,39 @@ # convert {"a|b|c":count,...} into [[a,b,c,count],...] return [list(k) + [v] for k, v in stats.iteritems()] - def generateGraphData(self, stats): safePrint('Generating data files to %s' % self.pathGraphs) - def error(self, error): - self.lastErrorTs = datetime.now() - self.lastErrorMsg = error - - safePrint(error) - - if not self.smtpHost or not self.smtpFrom or not self.smtpTo: - return - - import smtplib - from email.mime.text import MIMEText - from email.mime.multipart import MIMEMultipart - - msg = MIMEMultipart('alternative') - msg['Subject'] = 'SMS report error' - msg.attach(MIMEText(error, 'plain', 'utf-8')) - - # m = MIMEText(error, 'plain', 'utf-8') - # m['From'] = self.smtpFrom - # m['To'] = self.smtpTo - # m['Subject'] = msg['Subject'] - - smtp = smtplib.SMTP(self.smtpHost) - smtp.sendmail(self.smtpFrom, self.smtpTo, msg.as_string().encode('ascii')) - smtp.quit() - - def run(self): - # noinspection PyBroadException - try: - self.processLogFiles() - stats = self.combineStats() - self.generateGraphData(stats) - self.lastGoodRunTs = datetime.now() - except: - self.error(traceback.format_exc()) - self.saveState() - - def manualRun(self): - # prc.reformatArch() - - # prc.processLogFiles() - stats = self.combineStats() - - writeData(os.path.join(self.pathStats, 'combined-all.tsv'), stats, columnHeaders11) - - writeData(os.path.join(self.pathStats, 'combined-errors.tsv'), + writeData(os.path.join(self.pathGraphs, 'combined-all.tsv'), + stats, columnHeaders11) + writeData(os.path.join(self.pathGraphs, 'combined-errors.tsv'), ifilter(lambda v: v[1] == 'ERR', stats), columnHeaders11) - - writeData(os.path.join(self.pathStats, 'combined-stats.tsv'), + writeData(os.path.join(self.pathGraphs, 'combined-stats.tsv'), ifilter(lambda v: v[1] == 'STAT', stats), columnHeaders11) - - writeData(os.path.join(self.pathStats, 'combined-data.tsv'), + writeData(os.path.join(self.pathGraphs, 'combined-data.tsv'), ifilter(lambda v: v[1] == 'DATA', stats), columnHeaders11) + def run(self): + self.processLogFiles() + stats = self.combineStats() + self.generateGraphData(stats) + def manualRun(self): + pass + # prc.reformatArch() + # prc.processLogFiles() + # stats = self.combineStats() # file = r'c:\Users\user\mw\shared\zero-sms\data\weblogs\zero.tsv.log-20140808.gz' # prc.processLogFile(file, file + '.json') - - # file = r'c:\Users\user\mw\shared\zero-sms\data\weblogs\zero.tsv.log-20140808.gz' - # prc.processLogFile(file, file + '.json') - # prc.downloadConfigs() - - - def normalizePath(self, path, relToSettings=True): - if not os.path.isabs(path) and relToSettings: - path = os.path.join(os.path.dirname(self.settingsFile), path) - path = os.path.abspath(os.path.normpath(path)) - dirPath = path if relToSettings else os.path.dirname(path) - if not os.path.exists(dirPath): - os.makedirs(dirPath) - return path - - - def reformatArch(self): - for f in os.listdir(self.pathStats): - if not self.statFileRe.match(f): - continue - pth = os.path.join(self.pathStats, f) - writeData(pth + '.new', readData(pth, -len(columnHeaders10)), columnHeaders10) - os.rename(pth, pth + '.old') + # for f in os.listdir(self.pathCache): + # if not self.statFileRe.match(f): + # continue + # pth = os.path.join(self.pathCache, f) + # writeData(pth + '.new', readData(pth, -len(columnHeaders10)), columnHeaders10) + # os.rename(pth, pth + '.old') if __name__ == "__main__": - prc = LogProcessor(logDatePattern=(sys.argv[1] if len(sys.argv) > 1 else False)) - # prc.run() - prc.manualRun() + WebLogProcessor(logDatePattern=(sys.argv[1] if len(sys.argv) > 1 else False)).safeRun() -- To view, visit https://gerrit.wikimedia.org/r/154230 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id294fabdae2b67b5d1b366455b83606a05513465 Gerrit-PatchSet: 1 Gerrit-Project: analytics/zero-sms Gerrit-Branch: master Gerrit-Owner: Yurik <yu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits