ArielGlenn has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/394857 )
Change subject: ability to do xmlpageslogging several pieces at a time in
parallel
......................................................................
ability to do xmlpageslogging several pieces at a time in parallel
Bug: T181935
Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6
---
M xmldumps-backup/defaults.conf
M xmldumps-backup/dumps/WikiDump.py
M xmldumps-backup/dumps/recombinejobs.py
M xmldumps-backup/dumps/runner.py
M xmldumps-backup/dumps/runnerutils.py
M xmldumps-backup/dumps/utils.py
M xmldumps-backup/dumps/xmljobs.py
M xmldumps-backup/xmlstreams.py
8 files changed, 157 insertions(+), 15 deletions(-)
Approvals:
ArielGlenn: Looks good to me, approved
jenkins-bot: Verified
diff --git a/xmldumps-backup/defaults.conf b/xmldumps-backup/defaults.conf
index e95ac47..222cb2d 100644
--- a/xmldumps-backup/defaults.conf
+++ b/xmldumps-backup/defaults.conf
@@ -60,6 +60,8 @@
pagesPerChunkHistory=0
revsPerChunkHistory=0
pagesPerChunkAbstract=0
+chunksForPagelogs=0
+logitemsPerPagelogs=0
jobsperbatch=""
revsPerJob=1000000
maxRetries=0
diff --git a/xmldumps-backup/dumps/WikiDump.py
b/xmldumps-backup/dumps/WikiDump.py
index 8ba3838..005f858 100644
--- a/xmldumps-backup/dumps/WikiDump.py
+++ b/xmldumps-backup/dumps/WikiDump.py
@@ -282,6 +282,10 @@
"chunks", "chunksForAbstract", 0)
self.pages_per_filepart_abstract = self.get_opt_for_proj_or_default(
"chunks", "pagesPerChunkAbstract", 0)
+ self.numparts_for_pagelogs = self.get_opt_for_proj_or_default(
+ "chunks", "chunksForPagelogs", 0)
+ self.logitems_per_filepart_pagelogs = self.get_opt_for_proj_or_default(
+ "chunks", "logitemsPerPagelogs", 0)
self.recombine_history = self.get_opt_for_proj_or_default(
"chunks", "recombineHistory", 1)
self.checkpoint_time = self.get_opt_for_proj_or_default(
diff --git a/xmldumps-backup/dumps/recombinejobs.py
b/xmldumps-backup/dumps/recombinejobs.py
index 6d1f0a1..929f23a 100644
--- a/xmldumps-backup/dumps/recombinejobs.py
+++ b/xmldumps-backup/dumps/recombinejobs.py
@@ -329,3 +329,56 @@
error = result
if error:
raise BackupError("error recombining abstract dump files")
+
+
+class RecombineXmlLoggingDump(RecombineDump):
+ def __init__(self, name, desc, item_for_recombine):
+ # no partnum_todo, no parts generally (False, False), even though
input may have it
+ self.item_for_recombine = item_for_recombine
+ self._prerequisite_items = [self.item_for_recombine]
+ super(RecombineXmlLoggingDump, self).__init__(name, desc)
+
+ def get_filetype(self):
+ return self.item_for_recombine.get_filetype()
+
+ def get_file_ext(self):
+ return self.item_for_recombine.get_file_ext()
+
+ def get_dumpname(self):
+ return self.item_for_recombine.get_dumpname()
+
+ def build_command(self, runner, to_recombine_dfnames, output_dfname):
+ input_dfnames = []
+ for in_dfname in to_recombine_dfnames:
+ if in_dfname.dumpname == output_dfname.dumpname:
+ input_dfnames.append(in_dfname)
+ if not len(input_dfnames):
+ self.set_status("failed")
+ raise BackupError("No input files for %s found" % self.name())
+ if not exists(runner.wiki.config.gzip):
+ raise BackupError("gzip command %s not found" %
runner.wiki.config.gzip)
+ compression_command = "%s > " % runner.wiki.config.gzip
+ uncompression_command = ["%s" % runner.wiki.config.gzip, "-dc"]
+ recombine_command_string = self.build_recombine_command_string(
+ runner, input_dfnames, output_dfname, compression_command,
+ uncompression_command)
+ recombine_command = [recombine_command_string]
+ recombine_pipeline = [recombine_command]
+ series = [recombine_pipeline]
+ return series
+
+ def run(self, runner):
+ error = 0
+ to_recombine_dfnames =
self.item_for_recombine.list_outfiles_for_input(runner.dump_dir)
+ output_dfnames = self.list_outfiles_for_build_command(runner.dump_dir)
+ for output_dfname in output_dfnames:
+ command_series = self.build_command(runner, to_recombine_dfnames,
output_dfname)
+ self.setup_command_info(runner, command_series, [output_dfname])
+ result, broken = runner.run_command(
+ [command_series], callback_timed=self.progress_callback,
+ callback_timed_arg=runner, shell=True,
+ callback_on_completion=self.command_completion_callback)
+ if result:
+ error = result
+ if error:
+ raise BackupError("error recombining log event files")
diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py
index 662f581..e0dfaca 100644
--- a/xmldumps-backup/dumps/runner.py
+++ b/xmldumps-backup/dumps/runner.py
@@ -13,7 +13,7 @@
from dumps.apijobs import SiteInfoDump
from dumps.tablesjobs import PrivateTable, PublicTable, TitleDump, AllTitleDump
from dumps.recombinejobs import RecombineAbstractDump, RecombineXmlDump
-from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump
+from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump,
RecombineXmlLoggingDump
from dumps.xmljobs import XmlLogging, XmlStub, AbstractDump
from dumps.xmlcontentjobs import XmlDump, BigXmlDump
from dumps.recompressjobs import XmlMultiStreamDump, XmlRecompressDump
@@ -266,7 +266,14 @@
self.find_item_by_name('metacurrentdump')))
self.dump_items.append(
- XmlLogging("Log events to all pages and users."))
+ XmlLogging("Log events to all pages and users.",
+ self._get_partnum_todo("xmlpagelogsdump"),
+ get_int_setting(self.jobsperbatch, "xmlpagelogsdump"),
+ self.filepart.get_logitems_per_filepart_pagelogs()))
+
+ self.append_job_if_needed(RecombineXmlLoggingDump(
+ "xmlpagelogsdumprecombine", "Recombine Log events to all pages and
users",
+ self.find_item_by_name('xmlpagelogsdump')))
self.append_job_if_needed(
FlowDump("xmlflowdump", "content of flow pages in xml format"))
diff --git a/xmldumps-backup/dumps/runnerutils.py
b/xmldumps-backup/dumps/runnerutils.py
index 10b0295..12a77a5 100644
--- a/xmldumps-backup/dumps/runnerutils.py
+++ b/xmldumps-backup/dumps/runnerutils.py
@@ -841,6 +841,7 @@
self.wiki.config.pages_per_filepart_history,
self.wiki.config.revs_per_filepart_history,
self.wiki.config.numparts_for_abstract,
+ self.wiki.config.numparts_for_pagelogs,
self.wiki.config.pages_per_filepart_abstract,
self.wiki.config.recombine_history,
self.wiki.config.checkpoint_time]
@@ -894,9 +895,10 @@
self.wiki.config.pages_per_filepart_history = settings[1]
self.wiki.config.revs_per_filepart_history = settings[2]
self.wiki.config.numparts_for_abstract = settings[3]
- self.wiki.config.pages_per_filepart_abstract = settings[4]
- self.wiki.config.recombine_history = settings[5]
- self.wiki.config.checkpoint_time = settings[6]
+ self.wiki.config.numparts_for_pagelogs = settings[4]
+ self.wiki.config.pages_per_filepart_abstract = settings[5]
+ self.wiki.config.recombine_history = settings[6]
+ self.wiki.config.checkpoint_time = settings[7]
class DumpRunJobData(object):
diff --git a/xmldumps-backup/dumps/utils.py b/xmldumps-backup/dumps/utils.py
index 9c2f040..2bdc291 100644
--- a/xmldumps-backup/dumps/utils.py
+++ b/xmldumps-backup/dumps/utils.py
@@ -463,6 +463,7 @@
class PageAndEditStats(object):
def __init__(self, wiki, db_name, error_callback=None):
self.total_pages = None
+ self.total_logitems = None
self.total_edits = None
self.wiki = wiki
self.db_name = db_name
@@ -487,6 +488,7 @@
lines = results.splitlines()
if lines and lines[1]:
self.total_pages = int(lines[1])
+
query = "select MAX(rev_id) from %srevision;" %
self.db_server_info.db_table_prefix
retries = 0
results = None
@@ -501,10 +503,29 @@
lines = results.splitlines()
if lines and lines[1]:
self.total_edits = int(lines[1])
+
+ query = "select MAX(log_id) from %slogging;" %
self.db_server_info.db_table_prefix
+ retries = 0
+ results = None
+ results = self.db_server_info.run_sql_and_get_output(query)
+ while results is None and retries < maxretries:
+ retries = retries + 1
+ time.sleep(5)
+ results = self.db_server_info.run_sql_and_get_output(query)
+ if not results:
+ return 1
+
+ lines = results.splitlines()
+ if lines and lines[1]:
+ self.total_logitems = int(lines[1])
+
return 0
def get_total_pages(self):
return self.total_pages
+
+ def get_total_logitems(self):
+ return self.total_logitems
def get_total_edits(self):
return self.total_edits
@@ -548,6 +569,17 @@
self._pages_per_filepart_abstract = self.convert_comma_sep(
self.wiki.config.pages_per_filepart_abstract)
+ if (self.wiki.config.numparts_for_pagelogs and
+ self.wiki.config.numparts_for_pagelogs != "0"):
+ # we add 200 padding to cover new log entries that may be added
+ logitems_per_filepart = 200 + self.stats.total_logitems / int(
+ self.wiki.config.numparts_for_pagelogs)
+ self._logitems_per_filepart_pagelogs = [logitems_per_filepart
for i in range(
+ 0, int(self.wiki.config.numparts_for_pagelogs))]
+ else:
+ self._logitems_per_filepart_pagelogs = self.convert_comma_sep(
+ self.wiki.config.logitems_per_filepart_pagelogs)
+
self._pages_per_filepart_history = self.convert_comma_sep(
self.wiki.config.pages_per_filepart_history)
self._revs_per_filepart_history = self.convert_comma_sep(
@@ -557,6 +589,7 @@
self._pages_per_filepart_history = False
self._revs_per_filepart_history = False
self._pages_per_filepart_abstract = False
+ self._logitems_per_filepart_pagelogs = False
self._recombine_history = False
if self._parts_enabled:
if self._revs_per_filepart_history:
@@ -595,6 +628,18 @@
else:
self._num_parts_abstract = 0
+ if self._logitems_per_filepart_pagelogs:
+ if (len(self._logitems_per_filepart_pagelogs) == 1 and
+ self._logitems_per_filepart_pagelogs[0]):
+ self._num_parts_pagelogs =
self.get_num_parts_for_xml_dumps(
+ self.stats.total_logitems,
self._logitems_per_filepart_pagelogs[0])
+ self._logitems_per_filepart_pagelogs =
[self._logitems_per_filepart_pagelogs[0]
+ for i in
range(self._num_parts_pagelogs)]
+ else:
+ self._num_parts_pagelogs =
len(self._logitems_per_filepart_pagelogs)
+ else:
+ self._num_parts_pagelogs = 0
+
def convert_comma_sep(self, line):
if line == "":
return False
@@ -608,6 +653,9 @@
def get_pages_per_filepart_abstract(self):
return self._pages_per_filepart_abstract
+ def get_logitems_per_filepart_pagelogs(self):
+ return self._logitems_per_filepart_pagelogs
+
def get_num_parts_abstract(self):
return self._num_parts_abstract
diff --git a/xmldumps-backup/dumps/xmljobs.py b/xmldumps-backup/dumps/xmljobs.py
index 4419d42..3a59c28 100644
--- a/xmldumps-backup/dumps/xmljobs.py
+++ b/xmldumps-backup/dumps/xmljobs.py
@@ -195,7 +195,13 @@
class XmlLogging(Dump):
""" Create a logging dump of all page activity """
- def __init__(self, desc, parts=False):
+ def __init__(self, desc, partnum_todo, jobsperbatch=None, parts=False):
+ self._partnum_todo = partnum_todo
+ self.jobsperbatch = jobsperbatch
+ self._parts = parts
+ if self._parts:
+ self._parts_enabled = True
+ self.onlyparts = True
Dump.__init__(self, "xmlpagelogsdump", desc)
def detail(self):
@@ -229,6 +235,19 @@
config_file_arg, "--wiki", runner.db_name,
"--outfile", DumpFilename.get_inprogress_name(logging_path)]
+ if output_dfname.partnum:
+ # set up start end end pageids for this piece
+ # note there is no item id 0 I guess. so we start with 1
+ start = sum([self._parts[i] for i in range(0,
output_dfname.partnum_int - 1)]) + 1
+ startopt = "--start=%s" % start
+ # if we are on the last file part, we should get up to the last
log item id,
+ # whatever that is.
+ command.append(startopt)
+ if output_dfname.partnum_int < len(self._parts):
+ end = sum([self._parts[i] for i in range(0,
output_dfname.partnum_int)]) + 1
+ endopt = "--end=%s" % end
+ command.append(endopt)
+
pipeline = [command]
series = [pipeline]
return series
@@ -236,14 +255,19 @@
def run(self, runner):
self.cleanup_old_files(runner.dump_dir, runner)
dfnames = self.list_outfiles_for_build_command(runner.dump_dir)
- if len(dfnames) > 1:
- raise BackupError("logging table job wants to produce more than
one output file")
- output_dfname = dfnames[0]
- command_series = self.build_command(runner, output_dfname)
- self.setup_command_info(runner, command_series, [output_dfname])
- error, broken = runner.run_command([command_series],
callback_stderr=self.progress_callback,
- callback_stderr_arg=runner,
-
callback_on_completion=self.command_completion_callback)
+ if self.jobsperbatch is not None:
+ maxjobs = self.jobsperbatch
+ else:
+ maxjobs = len(dfnames)
+ for batch in batcher(dfnames, maxjobs):
+ commands = []
+ for output_dfname in batch:
+ command_series = self.build_command(runner, output_dfname)
+ self.setup_command_info(runner, command_series,
[output_dfname])
+ commands.append(command_series)
+ error, broken = runner.run_command(commands,
callback_stderr=self.progress_callback,
+ callback_stderr_arg=runner,
+
callback_on_completion=self.command_completion_callback)
if error:
raise BackupError("error dumping log files")
diff --git a/xmldumps-backup/xmlstreams.py b/xmldumps-backup/xmlstreams.py
index 6f4c1de..3163fba 100644
--- a/xmldumps-backup/xmlstreams.py
+++ b/xmldumps-backup/xmlstreams.py
@@ -42,7 +42,9 @@
if interval is None:
# hope this is not too awful a guess
interval = (int(end) - int(start)) / 50
- if interval > max_interval:
+ if interval == 0:
+ interval = 1
+ elif interval > max_interval:
interval = max_interval
interval_save = interval
--
To view, visit https://gerrit.wikimedia.org/r/394857
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6
Gerrit-PatchSet: 11
Gerrit-Project: operations/dumps
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <[email protected]>
Gerrit-Reviewer: ArielGlenn <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits