ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/215671

Change subject: dumps: do xml page logs via streaming
......................................................................

dumps: do xml page logs via streaming

as with the xml stubs streaming, this works around php memory
leaks by dumping small ranges and feeding them to the
compressor for final output

removed 'batchsize' configuration option for page log dumps
as it's no longer needed.

Change-Id: Ieeb5a38a62a3aeece64e1420f66002dddc740b2e
---
M xmldumps-backup/WikiDump.py
M xmldumps-backup/wikidump.conf.sample
M xmldumps-backup/worker.py
A xmldumps-backup/xmllogs.py
4 files changed, 153 insertions(+), 90 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/71/215671/1

diff --git a/xmldumps-backup/WikiDump.py b/xmldumps-backup/WikiDump.py
index 84c2572..19eee24 100644
--- a/xmldumps-backup/WikiDump.py
+++ b/xmldumps-backup/WikiDump.py
@@ -237,9 +237,6 @@
                        "checkpointTime" : "0",
                        #"otherformats": {
                        "multistream" : "0",
-                       # "pageslogging" : {
-                       # number of rows to request in a single query, default 
is no batch, do them all
-                       "loggingBatchsize" : "0",
                        }
                self.conf = ConfigParser.SafeConfigParser(defaults)
                self.conf.read(self.files)
@@ -359,10 +356,6 @@
                if not self.conf.has_section('cleanup'):
                        self.conf.add_section('cleanup')
                self.keep = self.conf.getint("cleanup", "keep")
-
-               if not self.conf.has_section('pageslogging'):
-                       self.conf.add_section('pageslogging')
-               self.loggingBatchsize = 
self.conf.getint("pageslogging","batchsize")
 
        def parseConfFilePerProject(self, projectName = False):
                # we need to read from the project section without falling back
diff --git a/xmldumps-backup/wikidump.conf.sample 
b/xmldumps-backup/wikidump.conf.sample
index 43669ba..b97c184 100644
--- a/xmldumps-backup/wikidump.conf.sample
+++ b/xmldumps-backup/wikidump.conf.sample
@@ -44,6 +44,3 @@
 chunksEnabled=1
 pagesPerChunkHistory=10000,50000,50000,50000,50000
 pagesPerChunkAbstract=100000,100000
-
-[pageslogging]
-batchsize=1000
diff --git a/xmldumps-backup/worker.py b/xmldumps-backup/worker.py
index fe0c9a1..a4703a7 100644
--- a/xmldumps-backup/worker.py
+++ b/xmldumps-backup/worker.py
@@ -2954,25 +2954,6 @@
        def getFileExt(self):
                return "gz"
 
-       def getMaxLogID(self, runner):
-               dbServerInfo = DbServerInfo(runner.wiki, runner.dbName)
-               query = "select MAX(log_id) from %slogging;" % 
dbServerInfo.dBTablePrefix
-               results = None
-               retries = 0
-               maxretries = 5
-               results = dbServerInfo.runSqlAndGetOutput(query)
-               while (results == None and retries < maxretries):
-                       retries = retries + 1
-                       time.sleep(5)
-                       results = dbServerInfo.runSqlAndGetOutput(query)
-               if (not results):
-                       return None
-               lines = results.splitlines()
-               if (lines and lines[1]):
-                       return int(lines[1])
-               else:
-                       return None
-
        def getTempFilename(self, name, number):
                return name + "-" + str(number)
 
@@ -2986,68 +2967,16 @@
                        raise BackupError("php command %s not found" % 
runner.wiki.config.php)
                scriptCommand = 
MultiVersion.MWScriptAsArray(runner.wiki.config, "dumpBackup.php")
 
-               # do logging table in batches to avoid taking days to dump 
(wikidata for example)
-               maxLogId = self.getMaxLogID(runner)
-               if not maxLogId:
-                       raise BackupError("error retrieving max id from logging 
table")
+                logging = runner.dumpDir.filenamePublicPath(outputFileObj)
 
-               batchsize = runner.wiki.config.loggingBatchsize
-               if batchsize:
-                       startId = 0
-                       tempFiles = []
-                       tempFileObjs = []
-                       while startId < maxLogId:
-                               endId = startId + batchsize
-                               fileObjThisBatch = DumpFilename(runner.wiki, 
outputFileObj.date, self.getTempFilename(outputFileObj.dumpName,startId), 
outputFileObj.fileType, outputFileObj.fileExt)
-                               tempFileObjs.append(fileObjThisBatch)
-                               logging = 
runner.dumpDir.filenamePublicPath(fileObjThisBatch)
-                               tempFiles.append(logging)
-                               command = [ "%s" % runner.wiki.config.php, "-q" 
]
-                               command.extend(scriptCommand)
-                               command.extend(["--wiki=%s" % runner.dbName,
-                                               "--logs", "--report=10000",
-                                               "%s" % 
runner.forceNormalOption(),
-                                               "--start=%s" % startId,
-                                               "--end=%s" % endId,
-                                               "--output=gzip:%s" % logging ])
-                               pipeline = [ command ]
-                               series = [ pipeline ]
-                               error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback,
-                                                 callbackStderrArg=runner)
-                               if (error):
-                                       raise BackupError("error dumping log 
files")
-                               startId = endId
-                       # recombine these now
-                       if (not exists( runner.wiki.config.gzip ) ):
-                               raise BackupError("gzip command %s not found" % 
runner.wiki.config.gzip)
-                       compressionCommand = runner.wiki.config.gzip
-                       compressionCommand = "%s > " % runner.wiki.config.gzip
-                       uncompressionCommand = [ "%s" % 
runner.wiki.config.gzip, "-dc" ]
-                       recombineCommandString = 
self.buildRecombineCommandString(runner, tempFileObjs, outputFileObj, 
compressionCommand, uncompressionCommand )
-                       recombineCommand = [ recombineCommandString ]
-                       recombinePipeline = [ recombineCommand ]
-                       series = [ recombinePipeline ]
-                       result = runner.runCommand([ series ], 
callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
-                       if result:
-                               error = result
-                       if (error):
-                               raise BackupError("error recombining 
pages-logging files")
-                       # clean up those intermediate files now
-                       for f in tempFiles:
-                               os.remove(f)
-               else:
-                       logging = 
runner.dumpDir.filenamePublicPath(outputFileObj)
-                       command = [ "%s" % runner.wiki.config.php, "-q" ]
-                       command.extend(scriptCommand)
-                       command.extend(["--wiki=%s" % runner.dbName,
-                                       "--logs", "--report=10000",
-                                       "%s" % runner.forceNormalOption(),
-                                       "--output=gzip:%s" % logging ])
-                       pipeline = [ command ]
-                       series = [ pipeline ]
-                       error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback, callbackStderrArg=runner)
-                       if (error):
-                               raise BackupError("error dmping log files")
+                command = [ "/usr/bin/python", "xmllogs.py", "--config", 
runner.wiki.config.files[0], "--wiki", runner.dbName,
+                            runner.forceNormalOption(), "--outfile", logging ]
+
+                pipeline = [ command ]
+                series = [ pipeline ]
+                error = runner.runCommand([ series ], 
callbackStderr=self.progressCallback, callbackStderrArg=runner)
+                if (error):
+                        raise BackupError("error dumping log files")
 
 class XmlDump(Dump):
        """Primary XML dumps, one section at a time."""
diff --git a/xmldumps-backup/xmllogs.py b/xmldumps-backup/xmllogs.py
new file mode 100644
index 0000000..32c8538
--- /dev/null
+++ b/xmldumps-backup/xmllogs.py
@@ -0,0 +1,144 @@
+'''
+generate an xml dump via multiple runs of a php script instead of one
+long run.
+
+avoids memory leak issues, permits retries when a single run fails,
+recovery if db servers go away in the middle of a run by retrying
+the run.
+'''
+
+import os
+import sys
+import time
+import worker
+import WikiDump
+import getopt
+from xmlstreams import run_script, catfile, gzippit, get_max_id, do_xml_piece, 
do_xml_stream
+
+
+def dologsbackup(wikidb, outfile,
+                 wikiconf, force_normal, start, end, dryrun):
+    '''
+    do a logs xml dump one piece at a time, writing into uncompressed
+    temporary files and shovelling those into gzip's stdin for the
+    concatenated compressed output
+    '''
+    outfiles = {'logs': {'name': outfile}}
+    for filetype in outfiles:
+        outfiles[filetype]['temp'] = os.path.join(wikiconf.tempDir, 
os.path.basename(outfiles[filetype]['name']) + "_tmp")
+        outfiles[filetype]['compr'] = gzippit(outfiles[filetype]['name'])
+
+    script_command = worker.MultiVersion.MWScriptAsArray(wikiconf, 
"dumpBackup.php")
+    command = [wikiconf.php, "-q"] + script_command
+
+    command.extend(["--wiki=%s" % wikidb,
+                    "--logs", "--report=1000",
+                    "--output=file:%s" % outfiles['logs']['temp']
+                    ])
+    if force_normal is not None:
+        command.append("--force-normal")
+
+    do_xml_stream(wikidb, outfiles, command, wikiconf, force_normal,
+                  start, end, dryrun, 'log_id', 'logging',
+                  50000, 100000, '</logitem>\n')
+
+
+def usage(message=None):
+    """
+    display a helpful usage message with
+    an optional introductory message first
+    """
+    if message is not None:
+        sys.stderr.write(message)
+        sys.stderr.write("\n")
+    usage_message = """
+Usage: xmllogs.py --wiki wikidbname --outfile path
+    [--start number] [--end number]
+    [--force-normal bool] [--config path]
+
+Options:
+
+  --wiki (-w):         wiki db name, e.g. enwiki
+  --outfile (-o):      full path to xml logs dump that will be created
+
+  --start (-s):        starting log id to dump (default: 1)
+  --end (-e):          ending log id to dump (default: dump all)
+
+  --force-normal (-f): if set, this argument will be passed through to 
dumpBackup.php
+                       (default: unset)
+  --config (-C):       path to wikidump configfile (default: "wikidump.conf" 
in current dir)
+  --dryrun (-d):       display the commands that would be run to produce the 
output but
+                       don't actually run them
+"""
+    sys.stderr.write(usage_message)
+    sys.exit(1)
+
+
+def main():
+    'main entry point, does all the work'
+    wiki = None
+    output_file = None
+    start = None
+    end = None
+    force_normal = False
+    configfile = "wikidump.conf"
+    dryrun = False
+
+    try:
+        (options, remainder) = getopt.gnu_getopt(
+            sys.argv[1:], "w:o:s:e:C:fhv",
+            ["wiki=", "outfile=",
+             "start=", "end=", "config=", "force-normal",
+             "help", "dryrun"])
+
+    except getopt.GetoptError as err:
+        usage("Unknown option specified: " + str(err))
+    for (opt, val) in options:
+        if opt in ["-w", "--wiki"]:
+            wiki = val
+        elif opt in ["-o", "--outfile"]:
+            output_file = val
+        elif opt in ["-s", "--start"]:
+            start = val
+        elif opt in ["-e", "--end"]:
+            end = val
+        elif opt in ["-f", "--force-normal"]:
+            force_normal = True
+        elif opt in ["-C", "--config"]:
+            configfile = val
+        elif opt in ["-d", "--dryrun"]:
+            dryrun = True
+        elif opt in ["-h", "--help"]:
+            usage('Help for this script\n')
+        else:
+            usage("Unknown option specified: <%s>" % opt)
+
+    if len(remainder) > 0:
+        usage("Unknown option(s) specified: <%s>" % remainder[0])
+
+    if wiki is None:
+        usage("mandatory argument argument missing: --wiki")
+    if output_file is None:
+        usage("mandatory argument argument missing: --output")
+
+    if start is not None:
+        if not start.isdigit():
+            usage("value for --start must be a number")
+        else:
+            start = int(start)
+
+    if end is not None:
+        if not end.isdigit():
+            usage("value for --end must be a number")
+        else:
+            end = int(end)
+
+    if not os.path.exists(configfile):
+        usage("no such file found: " + configfile)
+
+    wikiconf = WikiDump.Config(configfile)
+    wikiconf.parseConfFilePerProject(wiki)
+    dologsbackup(wiki, output_file, wikiconf, force_normal, start, end, dryrun)
+
+if __name__ == '__main__':
+    main()

-- 
To view, visit https://gerrit.wikimedia.org/r/215671
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ieeb5a38a62a3aeece64e1420f66002dddc740b2e
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: ariel
Gerrit-Owner: ArielGlenn <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to