Yurik has submitted this change and it was merged. Change subject: Updated settings storage ......................................................................
Updated settings storage Change-Id: I63f6b411816f1fae82a694eec4e711c011d0571c --- M download.py A run.cmd 2 files changed, 78 insertions(+), 77 deletions(-) Approvals: Yurik: Verified; Looks good to me, approved diff --git a/download.py b/download.py index c12ebab..7a74d26 100644 --- a/download.py +++ b/download.py @@ -14,8 +14,6 @@ import itertools import generate -import private - def generatePassword(size=10, chars=string.ascii_letters + string.digits): """ Adapted from @@ -38,29 +36,29 @@ self.settingsFilePath = os.path.join(self.workDir, settingsFile) self.combinedFilePath = os.path.join(self.workDir, 'combined.tsv') - self.tempFilePath = os.path.join(self.workDir, 'temp.tsv') - if os.path.exists(self.tempFilePath): os.remove(self.tempFilePath) self.statsFilePath = os.path.join(self.workDir, 'combined.json') - if os.path.exists(self.statsFilePath): os.remove(self.statsFilePath) data = self.loadState() - + self.awsBucket = data['awsBucket'] if 'awsBucket' in data else generatePassword() + self.awsKeyId = data['awsKeyId'] if 'awsKeyId' in data else generatePassword() + self.awsSecret = data['awsSecret'] if 'awsSecret' in data else generatePassword() + self.awsUser = data['awsUser'] if 'awsUser' in data else generatePassword() + self.downloadOverlapDays = data['downloadOverlapDays'] if 'downloadOverlapDays' in data else False self.enableDownload = data['enableDownload'] if 'enableDownload' in data else True self.enableDownloadOld = data['enableDownloadOld'] if 'enableDownloadOld' in data else True self.lastDownloadTs = self.parseDate(data['lastDownloadTs']) if 'lastDownloadTs' in data else False - self.downloadOverlapDays = data['downloadOverlapDays'] if 'downloadOverlapDays' in data else False - self.lastProcessedTs = self.parseDate(data['lastProcessedTs']) if 'lastProcessedTs' in data else False - self.processOverlapDays = data['processOverlapDays'] if 'processOverlapDays' in data else 1 - self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False - self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False - self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False - self.lastErrorTs = self.parseDate(data['lastErrorTs']) if 'lastErrorTs' in data else False self.lastErrorMsg = data['lastErrorMsg'] if 'lastErrorMsg' in data else False - self.sortCmd = data['sortCmd'] if 'sortCmd' in data else 'sort' + self.lastErrorTs = self.parseDate(data['lastErrorTs']) if 'lastErrorTs' in data else False self.lastGoodRunTs = self.parseDate(data['lastGoodRunTs']) if 'lastGoodRunTs' in data else False - self.partnerMap = data['partnerMap'] if 'partnerMap' in data else {} + self.lastProcessedTs = self.parseDate(data['lastProcessedTs']) if 'lastProcessedTs' in data else False self.partnerDirMap = data['partnerDirMap'] if 'partnerDirMap' in data else {} + self.partnerMap = data['partnerMap'] if 'partnerMap' in data else {} + self.processOverlapDays = data['processOverlapDays'] if 'processOverlapDays' in data else 1 self.salt = data['salt'] if 'salt' in data else generatePassword() + self.smtpFrom = data['smtpFrom'] if 'smtpFrom' in data else False + self.smtpHost = data['smtpHost'] if 'smtpHost' in data else False + self.smtpTo = data['smtpTo'] if 'smtpTo' in data else False + self.sortCmd = data['sortCmd'] if 'sortCmd' in data else 'sort' if self.downloadOverlapDays and self.lastDownloadTs: self.downloadIfAfter = self.lastDownloadTs - timedelta(days=self.downloadOverlapDays) @@ -83,22 +81,26 @@ def saveState(self): data = self.loadState() + data['awsBucket'] = self.awsBucket + data['awsKeyId'] = self.awsKeyId + data['awsSecret'] = self.awsSecret + data['awsUser'] = self.awsUser + data['downloadOverlapDays'] = int(self.downloadOverlapDays) if self.downloadOverlapDays else False data['enableDownload'] = self.enableDownload data['enableDownloadOld'] = self.enableDownloadOld data['lastDownloadTs'] = self.formatDate(self.lastDownloadTs) - data['downloadOverlapDays'] = int(self.downloadOverlapDays) if self.downloadOverlapDays else False - data['lastProcessedTs'] = self.formatDate(self.lastProcessedTs) - data['processOverlapDays'] = int(self.processOverlapDays) if self.processOverlapDays else False - data['smtpHost'] = self.smtpHost - data['smtpFrom'] = self.smtpFrom - data['smtpTo'] = self.smtpTo - data['lastErrorTs'] = self.formatDate(self.lastErrorTs) data['lastErrorMsg'] = self.lastErrorMsg - data['sortCmd'] = self.sortCmd + data['lastErrorTs'] = self.formatDate(self.lastErrorTs) data['lastGoodRunTs'] = self.formatDate(self.lastGoodRunTs) - data['partnerMap'] = self.partnerMap + data['lastProcessedTs'] = self.formatDate(self.lastProcessedTs) data['partnerDirMap'] = self.partnerDirMap + data['partnerMap'] = self.partnerMap + data['processOverlapDays'] = int(self.processOverlapDays) if self.processOverlapDays else False data['salt'] = self.salt + data['smtpFrom'] = self.smtpFrom + data['smtpHost'] = self.smtpHost + data['smtpTo'] = self.smtpTo + data['sortCmd'] = self.sortCmd stateBk = self.settingsFilePath + '.bak' with open(stateBk, 'wb') as f: @@ -118,9 +120,11 @@ return self.parseDate(m.group('date')) if m else False def download(self): - cn = S3Connection(private.aws_access_key_id, private.aws_secret_access_key) + print '\nDownloading files' + + cn = S3Connection(self.awsKeyId, self.awsSecret) prefix = 'prd-vumi-wikipedia.aws.prk-host.net/' - bucket = cn.get_bucket(private.bucket_name) + bucket = cn.get_bucket(self.awsBucket) files = bucket.list(prefix) for key in files: @@ -168,16 +172,20 @@ print 'Combining files into %s' % self.combinedFilePath print 'Processing %s' % (('files on or after %s' % self.processIfAfter) if self.processIfAfter else 'all files') - with io.open(self.combinedFilePath, 'a', encoding='utf8') as dst: - totalCount = 0 + + appendingDataFile = self.combinedFilePath + '.tmp' + manualLogRe = re.compile(ur'^wikipedia_application_\d+\.log\.\d+\.gz\:') + + totalCount = 0 + with io.open(appendingDataFile, 'w', encoding='utf8') as dst: for srcFile in sourceFiles: fileDate = self.getFileDate(srcFile) if self.processIfAfter: if not fileDate: - continue # old style filename, and the processIfAfter is set + continue # old style filename, and the processIfAfter is set elif fileDate <= self.processIfAfter: - continue # we have already processed this file + continue # we have already processed this file srcFilePath = os.path.join(self.dataDir, srcFile) if not os.path.isfile(srcFilePath): @@ -191,7 +199,8 @@ if count == 1 or totalCount % 30000 == 0: print('File %s, line %d, total lines %d' % (srcFile, count-1, totalCount-1)) - l = line.strip('\n\r') + l = line.strip(u'\n\r') + l = manualLogRe.sub( '', l, 1 ) if u' WIKI\t' in l: self.writeLine(dst, last) last = l @@ -204,6 +213,36 @@ self.writeLine(dst, last) if fileDate and (not self.lastProcessedTs or self.lastProcessedTs < fileDate): self.lastProcessedTs = fileDate + + if totalCount > 0: + # Sort files into one + sortedOutputFile = self.combinedFilePath + '.out' + if os.path.exists(sortedOutputFile): os.remove(sortedOutputFile) + + args = [self.sortCmd, '-u', '-o', sortedOutputFile, appendingDataFile] + originalExists = os.path.exists(self.combinedFilePath) + if originalExists: + args.append(self.combinedFilePath) + cmd = ' '.join([pipes.quote(v) for v in args]) + print('\nSorting: %s' % cmd) + try: + tmp2 = sortedOutputFile + '2' + if os.path.exists(tmp2): + os.remove(tmp2) + + subprocess.check_output(args, stderr=subprocess.STDOUT) + + # Extra safety - keep old file until we rename temp to its name + if originalExists: + os.rename(self.combinedFilePath, tmp2) + os.rename(sortedOutputFile, self.combinedFilePath) + if originalExists: + os.remove(tmp2) + + except subprocess.CalledProcessError, ex: + raise Exception(u'Error %s running %s\nOutput:\n%s' % (ex.returncode, cmd, ex.output)) + + os.remove(appendingDataFile) def writeLine(self, dst, line): if not line: @@ -229,26 +268,6 @@ parts[6] = parts[6].replace('\0', '\\0') dst.write('\t'.join(parts) + '\n') - - def sort(self): - - args = [self.sortCmd, '-u', '-o', self.tempFilePath, self.combinedFilePath] - cmd = ' '.join([pipes.quote(v) for v in args]) - print('\nSorting: %s' % cmd) - try: - tmp2 = self.tempFilePath + '2' - if os.path.exists(tmp2): - os.remove(tmp2) - - subprocess.check_output(args, stderr=subprocess.STDOUT) - - # Extra safety - keep old file until we rename temp to its name - os.rename(self.combinedFilePath, tmp2) - os.rename(self.tempFilePath, self.combinedFilePath) - os.remove(tmp2) - - except subprocess.CalledProcessError, ex: - raise Exception(u'Error %s running %s\nOutput:\n%s' % (ex.returncode, cmd, ex.output)) def error(self, error): self.lastErrorTs = datetime.now() @@ -277,24 +296,6 @@ s.quit() def generateGraphData(self, skipParsing=False): - # if ((parts[2] == 'airtel_ke_ussd_transport' and parts[3] == 'ussd' and parts[4] == 'airtel') or - # (parts[2] == 'airtel_ke_sms_transport' and parts[3] == 'sms' and parts[4] == '')): - # parts[2:5] = ['airtel'] - # elif ((parts[2] == 'vumi_starcode_transport' and parts[3] == 'ussd' and parts[4] == '') or - # (parts[2] == 'smpp_transport' and parts[3] == 'sms' and parts[4] == '')): - # parts[2:5] = ['vumi'] - # elif parts[2] == 'zambia_cellulant_ussd_transport' and parts[3] == 'ussd' and parts[4] == '': - # parts[2:5] = ['zambia-cellulant'] - # elif parts[2] == 'ambient_go_smpp_transport' and parts[3] == 'sms' and parts[4] == '': - # parts[2:5] = ['ambient_go'] - # elif ((parts[2] == 'truteq_8864_transport' or parts[2] == 'truteq_32323_transport') and parts[3] == 'ussd' and - # parts[4] == ''): - # parts[2:5] = ['truteq'] - # elif parts[2] == 'equity_kenya_ussd_smpp_transport' and parts[3] == 'ussd' and parts[4] == '': - # parts[2:5] = ['equity_ke'] - # else: - # raise BaseException(line) - stats = generate.Stats(self.combinedFilePath, self.graphDir, self.statsFilePath, self.partnerMap, self.partnerDirMap, self.salt) if not skipParsing: @@ -312,14 +313,13 @@ def run(self): # noinspection PyBroadException try: - # if self.enableDownload: - # self.download() - # files = os.listdir(self.dataDir) - # # files = itertools.chain([os.path.join('pc', f) for f in os.listdir(os.path.join(self.dataDir, 'pc'))], files) - # self.combineDataFiles(files) - # self.sort() + if self.enableDownload: + self.download() + files = os.listdir(self.dataDir) + # files = itertools.chain([os.path.join('pc', f) for f in os.listdir(os.path.join(self.dataDir, 'pc'))], files) + self.combineDataFiles(files) - self.generateGraphData(True) + self.generateGraphData() self.lastGoodRunTs = datetime.now() except: self.error(traceback.format_exc()) @@ -328,4 +328,4 @@ if __name__ == "__main__": prc = Processor() - prc.run() \ No newline at end of file + prc.run() diff --git a/run.cmd b/run.cmd new file mode 100644 index 0000000..d438060 --- /dev/null +++ b/run.cmd @@ -0,0 +1 @@ +c:\Python27\python.exe download.py -- To view, visit https://gerrit.wikimedia.org/r/150409 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I63f6b411816f1fae82a694eec4e711c011d0571c Gerrit-PatchSet: 1 Gerrit-Project: analytics/zero-sms Gerrit-Branch: master Gerrit-Owner: Yurik <yu...@wikimedia.org> Gerrit-Reviewer: Yurik <yu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits