ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/250433

Change subject: dumps: move Runner, DumpItemList out of command line script to 
module
......................................................................

dumps: move Runner, DumpItemList out of command line script to module

so they can be used by a couple other scripts

Change-Id: I313ca05d667bf0df9eb7a73022ec37aca6106e9b
---
M xmldumps-backup/dumpadmin.py
M xmldumps-backup/pagerange.py
M xmldumps-backup/worker.py
3 files changed, 5 insertions(+), 915 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/33/250433/1

diff --git a/xmldumps-backup/dumpadmin.py b/xmldumps-backup/dumpadmin.py
index 053cefa..c467ad7 100644
--- a/xmldumps-backup/dumpadmin.py
+++ b/xmldumps-backup/dumpadmin.py
@@ -14,7 +14,7 @@
 from dumps.utils import RunInfoFile
 from dumps.runnerutils import NoticeFile
 from dumps.jobs import DumpDir
-from worker import Runner
+from dumps.runner import Runner
 from dumps.WikiDump import Wiki, Config, TimeUtils
 
 
diff --git a/xmldumps-backup/pagerange.py b/xmldumps-backup/pagerange.py
index dcdac5f..db5aab0 100644
--- a/xmldumps-backup/pagerange.py
+++ b/xmldumps-backup/pagerange.py
@@ -9,7 +9,7 @@
 import CommandManagement
 
 from CommandManagement import CommandPipeline, CommandSeries, 
CommandsInParallel
-from worker import Runner
+from dumps.runner import Runner
 
 class PageRange(object):
     """Methods for getting number of revisions per page, 
diff --git a/xmldumps-backup/worker.py b/xmldumps-backup/worker.py
index 9d4bf80..60be16a 100644
--- a/xmldumps-backup/worker.py
+++ b/xmldumps-backup/worker.py
@@ -3,922 +3,12 @@
 import getopt
 import os
 import sys
-import shutil
-import Queue
-import thread
-import traceback
 
 from os.path import exists
 from dumps.WikiDump import TimeUtils, Wiki, Config, cleanup
-from dumps.CommandManagement import CommandsInParallel
-from dumps.jobs import AbstractDump, AllTitleDump, BackupError, BigXmlDump
-from dumps.jobs import DumpDir, DumpFilename, PrivateTable, PublicTable
-from dumps.jobs import RecombineAbstractDump, RecombineXmlDump
-from dumps.jobs import RecombineXmlStub, RecombineXmlRecompressDump
-from dumps.jobs import TitleDump, XmlDump, XmlLogging, XmlMultiStreamDump
-from dumps.jobs import XmlRecompressDump, XmlStub
-from dumps.runnerutils import SymLinks, Feeds, NoticeFile
-from dumps.runnerutils import Checksummer, Status, Maintenance
-
-from dumps.utils import DbServerInfo, RunInfoFile, Chunk
-
-
-class Logger(object):
-
-    def __init__(self, log_filename=None):
-        if log_filename:
-            self.log_file = open(log_filename, "a")
-        else:
-            self.log_file = None
-        self.queue = Queue.Queue()
-        self.jobs_done = "JOBSDONE"
-
-    def log_write(self, line=None):
-        if self.log_file:
-            self.log_file.write(line)
-            self.log_file.flush()
-
-    def log_close(self):
-        if self.log_file:
-            self.log_file.close()
-
-    # return 1 if logging terminated, 0 otherwise
-    def do_job_on_log_queue(self):
-        line = self.queue.get()
-        if line == self.jobs_done:
-            self.log_close()
-            return 1
-        else:
-            self.log_write(line)
-            return 0
-
-    def add_to_log_queue(self, line=None):
-        if line:
-            self.queue.put_nowait(line)
-
-    # set in order to have logging thread clean up and exit
-    def indicate_jobs_done(self):
-        self.queue.put_nowait(self.jobs_done)
-
-
-class DumpItemList(object):
-    def __init__(self, wiki, prefetch, spawn, chunk_to_do, checkpoint_file,
-                 singleJob, skip_jobs, chunk_info, page_id_range, 
runinfo_file, dump_dir):
-        self.wiki = wiki
-        self._has_flagged_revs = self.wiki.has_flagged_revs()
-        self._has_wikidata = self.wiki.has_wikidata()
-        self._has_global_usage = self.wiki.has_global_usage()
-        self._is_wikidata_client = self.wiki.is_wikidata_client()
-        self._prefetch = prefetch
-        self._spawn = spawn
-        self.chunk_info = chunk_info
-        self.checkpoint_file = checkpoint_file
-        self._chunk_todo = chunk_to_do
-        self._single_job = singleJob
-        self.skip_jobs = skip_jobs
-        self._runinfo_file = runinfo_file
-        self.dump_dir = dump_dir
-        self.page_id_range = page_id_range
-
-        if self.wiki.config.checkpoint_time:
-            checkpoints = True
-        else:
-            checkpoints = False
-
-        if self._single_job and self._chunk_todo:
-            if (self._single_job[-5:] == 'table' or
-                    self._single_job[-9:] == 'recombine' or
-                    self._single_job == 'createdirs' or
-                    self._single_job == 'noop' or
-                    self._single_job == 'latestlinks' or
-                    self._single_job == 'xmlpagelogsdump' or
-                    self._single_job == 'pagetitlesdump' or
-                    self._single_job == 'allpagetitlesdump' or
-                    self._single_job.endswith('recombine')):
-                raise BackupError("You cannot specify a chunk with the job %s, 
exiting.\n"
-                                  % self._single_job)
-
-        if self._single_job and self.checkpoint_file is not None:
-            if (self._single_job[-5:] == 'table' or
-                    self._single_job[-9:] == 'recombine' or
-                    self._single_job == 'noop' or
-                    self._single_job == 'createdirs' or
-                    self._single_job == 'latestlinks' or
-                    self._single_job == 'xmlpagelogsdump' or
-                    self._single_job == 'pagetitlesdump' or
-                    self._single_job == 'allpagetitlesdump' or
-                    self._single_job == 'abstractsdump' or
-                    self._single_job == 'xmlstubsdump' or
-                    self._single_job.endswith('recombine')):
-                raise BackupError("You cannot specify a checkpoint file with 
the job %s, exiting.\n"
-                                  % self._single_job)
-
-        self.dump_items = [PrivateTable("user", "usertable", "User account 
data."),
-                           PrivateTable("watchlist", "watchlisttable",
-                                        "Users' watchlist settings."),
-                           PrivateTable("ipblocks", "ipblockstable",
-                                        "Data for blocks of IP addresses, 
ranges, and users."),
-                           PrivateTable("archive", "archivetable",
-                                        "Deleted page and revision data."),
-                           # PrivateTable("updates", "updatestable",
-                           #              "Update dataset for OAI updater 
system."),
-                           PrivateTable("logging", "loggingtable",
-                                        "Data for various events (deletions, 
uploads, etc)."),
-                           PrivateTable("oldimage", "oldimagetable",
-                                        "Metadata on prior versions of 
uploaded images."),
-                           # PrivateTable("filearchive", "filearchivetable",
-                           #             "Deleted image data"),
-
-                           PublicTable("site_stats", "sitestatstable",
-                                       "A few statistics such as the page 
count."),
-                           PublicTable("image", "imagetable",
-                                       "Metadata on current versions of 
uploaded media/files."),
-                           # PublicTable("oldimage", "oldimagetable",
-                           #            "Metadata on prior versions of 
uploaded media/files."),
-                           PublicTable("pagelinks", "pagelinkstable",
-                                       "Wiki page-to-page link records."),
-                           PublicTable("categorylinks", "categorylinkstable",
-                                       "Wiki category membership link 
records."),
-                           PublicTable("imagelinks", "imagelinkstable",
-                                       "Wiki media/files usage records."),
-                           PublicTable("templatelinks", "templatelinkstable",
-                                       "Wiki template inclusion link 
records."),
-                           PublicTable("externallinks", "externallinkstable",
-                                       "Wiki external URL link records."),
-                           PublicTable("langlinks", "langlinkstable",
-                                       "Wiki interlanguage link records."),
-                           # PublicTable("interwiki", "interwikitable",
-                           #            "Set of defined interwiki prefixes " +
-                           #            "and links for this wiki."),
-                           PublicTable("user_groups", "usergroupstable", "User 
group assignments."),
-                           PublicTable("category", "categorytable", "Category 
information."),
-
-                           PublicTable("page", "pagetable",
-                                       "Base per-page data (id, title, old 
restrictions, etc)."),
-                           PublicTable("page_restrictions", 
"pagerestrictionstable",
-                                       "Newer per-page restrictions table."),
-                           PublicTable("page_props", "pagepropstable",
-                                       "Name/value pairs for pages."),
-                           PublicTable("protected_titles", 
"protectedtitlestable",
-                                       "Nonexistent pages that have been 
protected."),
-                           # PublicTable("revision", revisiontable",
-                           #            "Base per-revision data (does not 
include text)."), // safe?
-                           # PrivateTable("text", "texttable",
-                           #            "Text blob storage. May be compressed, 
etc."), // ?
-                           PublicTable("redirect", "redirecttable", "Redirect 
list"),
-                           PublicTable("iwlinks", "iwlinkstable",
-                                       "Interwiki link tracking records"),
-                           PublicTable("geo_tags", "geotagstable",
-                                       "List of pages' geographical 
coordinates"),
-                           PublicTable("change_tag", "changetagstable",
-                                       "List of annotations (tags) for 
revisions and log entries"),
-
-                           TitleDump("pagetitlesdump", "List of page titles in 
main namespace"),
-                           AllTitleDump("allpagetitlesdump", "List of all page 
titles"),
-
-                           AbstractDump("abstractsdump", "Extracted page 
abstracts for Yahoo",
-                                        
self._get_chunk_to_do("abstractsdump"), self.wiki.db_name,
-                                        
self.chunk_info.get_pages_per_chunk_abstract())]
-
-        if self.chunk_info.chunks_enabled():
-            self.dump_items.append(RecombineAbstractDump(
-                "abstractsdumprecombine", "Recombine extracted page abstracts 
for Yahoo",
-                self.find_item_by_name('abstractsdump')))
-
-        self.dump_items.append(XmlStub("xmlstubsdump", "First-pass for page 
XML data dumps",
-                                       self._get_chunk_to_do("xmlstubsdump"),
-                                       
self.chunk_info.get_pages_per_chunk_history()))
-        if self.chunk_info.chunks_enabled():
-            self.dump_items.append(RecombineXmlStub(
-                "xmlstubsdumprecombine", "Recombine first-pass for page XML 
data dumps",
-                self.find_item_by_name('xmlstubsdump')))
-
-        # NOTE that the chunk_info thing passed here is irrelevant,
-        # these get generated from the stubs which are all done in one pass
-        self.dump_items.append(
-            XmlDump("articles",
-                    "articlesdump",
-                    "<big><b>Articles, templates, media/file descriptions, " +
-                    "and primary meta-pages.</b></big>",
-                    "This contains current versions of article content, " +
-                    "and is the archive most mirror sites will probably want.",
-                    self.find_item_by_name('xmlstubsdump'), self._prefetch, 
self._spawn,
-                    self.wiki, self._get_chunk_to_do("articlesdump"),
-                    self.chunk_info.get_pages_per_chunk_history(), checkpoints,
-                    self.checkpoint_file, self.page_id_range))
-        if self.chunk_info.chunks_enabled():
-            self.dump_items.append(
-                RecombineXmlDump(
-                    "articlesdumprecombine",
-                    "<big><b>Recombine articles, templates, media/file 
descriptions, " +
-                    "and primary meta-pages.</b></big>",
-                    "This contains current versions of article content, and is 
" +
-                    "the archive most mirror sites will probably want.",
-                    self.find_item_by_name('articlesdump')))
-
-        self.dump_items.append(
-            XmlDump("meta-current",
-                    "metacurrentdump",
-                    "All pages, current versions only.",
-                    "Discussion and user pages are included in this complete 
archive. " +
-                    "Most mirrors won't want this extra material.",
-                    self.find_item_by_name('xmlstubsdump'), self._prefetch,
-                    self._spawn, self.wiki, 
self._get_chunk_to_do("metacurrentdump"),
-                    self.chunk_info.get_pages_per_chunk_history(), checkpoints,
-                    self.checkpoint_file, self.page_id_range))
-
-        if self.chunk_info.chunks_enabled():
-            self.dump_items.append(
-                RecombineXmlDump(
-                    "metacurrentdumprecombine",
-                    "Recombine all pages, current versions only.",
-                    "Discussion and user pages are included in this complete 
archive. " +
-                    "Most mirrors won't want this extra material.",
-                    self.find_item_by_name('metacurrentdump')))
-
-        self.dump_items.append(
-            XmlLogging("Log events to all pages and users."))
-
-        if self._has_flagged_revs:
-            self.dump_items.append(
-                PublicTable("flaggedpages", "flaggedpagestable",
-                            "This contains a row for each flagged article, " +
-                            "containing the stable revision ID, if the lastest 
edit " +
-                            "was flagged, and how long edits have been 
pending."))
-            self.dump_items.append(
-                PublicTable("flaggedrevs", "flaggedrevstable",
-                            "This contains a row for each flagged revision, " +
-                            "containing who flagged it, when it was flagged, " 
+
-                            "reviewer comments, the flag values, and the " +
-                            "quality tier those flags fall under."))
-
-        if self._has_wikidata:
-            self.dump_items.append(
-                PublicTable("wb_items_per_site", "wbitemspersitetable",
-                            "For each Wikidata item, this contains rows with 
the " +
-                            "corresponding page name on a given wiki 
project."))
-            self.dump_items.append(
-                PublicTable("wb_terms", "wbtermstable",
-                            "For each Wikidata item, this contains rows with a 
label, " +
-                            "an alias and a description of the item in a given 
language."))
-            self.dump_items.append(
-                PublicTable("wb_entity_per_page", "wbentityperpagetable",
-                            "Contains a mapping of page ids and entity ids, 
with " +
-                            "an additional entity type column."))
-            self.dump_items.append(
-                PublicTable("wb_property_info", "wbpropertyinfotable",
-                            "Contains a mapping of Wikidata property ids and 
data types."))
-            self.dump_items.append(
-                PublicTable("wb_changes_subscription", 
"wbchangessubscriptiontable",
-                            "Tracks which Wikibase Client wikis are using 
which items."))
-            self.dump_items.append(
-                PublicTable("sites", "sitestable",
-                            "This contains the SiteMatrix information from " +
-                            "meta.wikimedia.org provided as a table."))
-
-        if self._has_global_usage:
-            self.dump_items.append(
-                PublicTable("globalimagelinks", "globalimagelinkstable",
-                            "Global wiki media/files usage records."))
-
-        if self._is_wikidata_client:
-            self.dump_items.append(
-                PublicTable("wbc_entity_usage", "wbcentityusagetable",
-                            "Tracks which pages use which Wikidata items or 
properties " +
-                            "and what aspect (e.g. item label) is used."))
-
-        self.dump_items.append(
-            BigXmlDump(
-                "meta-history",
-                "metahistorybz2dump",
-                "All pages with complete page edit history (.bz2)",
-                "These dumps can be *very* large, uncompressing up to " +
-                "20 times the archive download size. " +
-                "Suitable for archival and statistical use, " +
-                "most mirror sites won't want or need this.",
-                self.find_item_by_name('xmlstubsdump'), self._prefetch, 
self._spawn,
-                self.wiki, self._get_chunk_to_do("metahistorybz2dump"),
-                self.chunk_info.get_pages_per_chunk_history(),
-                checkpoints, self.checkpoint_file, self.page_id_range))
-        if self.chunk_info.chunks_enabled() and 
self.chunk_info.recombine_history():
-            self.dump_items.append(
-                RecombineXmlDump(
-                    "metahistorybz2dumprecombine",
-                    "Recombine all pages with complete edit history (.bz2)",
-                    "These dumps can be *very* large, uncompressing up to " +
-                    "100 times the archive download size. " +
-                    "Suitable for archival and statistical use, " +
-                    "most mirror sites won't want or need this.",
-                    self.find_item_by_name('metahistorybz2dump')))
-        self.dump_items.append(
-            XmlRecompressDump(
-                "meta-history",
-                "metahistory7zdump",
-                "All pages with complete edit history (.7z)",
-                "These dumps can be *very* large, uncompressing up to " +
-                "100 times the archive download size. " +
-                "Suitable for archival and statistical use, " +
-                "most mirror sites won't want or need this.",
-                self.find_item_by_name('metahistorybz2dump'),
-                self.wiki, self._get_chunk_to_do("metahistory7zdump"),
-                self.chunk_info.get_pages_per_chunk_history(),
-                checkpoints, self.checkpoint_file))
-        if self.chunk_info.chunks_enabled() and 
self.chunk_info.recombine_history():
-            self.dump_items.append(
-                RecombineXmlRecompressDump(
-                    "metahistory7zdumprecombine",
-                    "Recombine all pages with complete edit history (.7z)",
-                    "These dumps can be *very* large, uncompressing " +
-                    "up to 100 times the archive download size. " +
-                    "Suitable for archival and statistical use, " +
-                    "most mirror sites won't want or need this.",
-                    self.find_item_by_name('metahistory7zdump'), self.wiki))
-        # doing this only for recombined/full articles dump
-        if self.wiki.config.multistream_enabled:
-            if self.chunk_info.chunks_enabled():
-                input_for_multistream = "articlesdumprecombine"
-            else:
-                input_for_multistream = "articlesdump"
-            self.dump_items.append(
-                XmlMultiStreamDump(
-                    "articles",
-                    "articlesmultistreamdump",
-                    "Articles, templates, media/file descriptions, and " +
-                    "primary meta-pages, in multiple bz2 streams, 100 pages 
per stream",
-                    "This contains current versions of article content, " +
-                    "in concatenated bz2 streams, 100 pages per stream, plus a 
separate" +
-                    "index of page titles/ids and offsets into the file.  " +
-                    "Useful for offline readers, or for parallel processing of 
pages.",
-                    self.find_item_by_name(input_for_multistream), self.wiki, 
None))
-
-        results = self._runinfo_file.get_old_runinfo_from_file()
-        if results:
-            for runinfo_obj in results:
-                self._set_dump_item_runinfo(runinfo_obj)
-            self.old_runinfo_retrieved = True
-        else:
-            self.old_runinfo_retrieved = False
-
-    def append_job(self, jobname, job):
-        if jobname not in self.skip_jobs:
-            self.dump_items.append(job)
-
-    def report_dump_runinfo(self):
-        """Put together a dump run info listing for this database, with all 
its component dumps."""
-        runinfo_lines = [self._report_dump_runinfo_line(item) for item in 
self.dump_items]
-        runinfo_lines.reverse()
-        text = "\n".join(runinfo_lines)
-        text = text + "\n"
-        return text
-
-    def all_possible_jobs_done(self):
-        for item in self.dump_items:
-            if (item.status() != "done" and item.status() != "failed"
-                    and item.status() != "skipped"):
-                return False
-        return True
-
-    # determine list of dumps to run ("table" expands to all table dumps,
-    # the rest of the names expand to single items)
-    # and mark the items in the list as such
-    # return False if there is no such dump or set of dumps
-    def mark_dumps_to_run(self, job, skipgood=False):
-        if job == "tables":
-            for item in self.dump_items:
-                if item.name()[-5:] == "table":
-                    if item.name in self.skip_jobs:
-                        item.set_skipped()
-                    elif not skipgood or item.status() != "done":
-                        item.set_to_run(True)
-            return True
-        else:
-            for item in self.dump_items:
-                if item.name() == job:
-                    if item.name in self.skip_jobs:
-                        item.set_skipped()
-                    elif not skipgood or item.status() != "done":
-                        item.set_to_run(True)
-                    return True
-        if job == "noop" or job == "latestlinks" or job == "createdirs":
-            return True
-        sys.stderr.write("No job of the name specified exists. Choose one of 
the following:\n")
-        sys.stderr.write("noop (runs no job but rewrites checksums files and"
-                         "resets latest links)\n")
-        sys.stderr.write("latestlinks (runs no job but resets latest links)\n")
-        sys.stderr.write("createdirs (runs no job but creates dump dirs for 
the given date)\n")
-        sys.stderr.write("tables (includes all items below that end in 
'table')\n")
-        for item in self.dump_items:
-            sys.stderr.write("%s\n" % item.name())
-            return False
-
-    def mark_following_jobs_to_run(self, skipgood=False):
-        # find the first one marked to run, mark the following ones
-        i = 0
-        for item in self.dump_items:
-            i = i + 1
-            if item.to_run():
-                for j in range(i, len(self.dump_items)):
-                    if item.name in self.skip_jobs:
-                        item.set_skipped()
-                    elif not skipgood or item.status() != "done":
-                        self.dump_items[j].set_to_run(True)
-                break
-
-    def mark_all_jobs_to_run(self, skipgood=False):
-        """Marks each and every job to be run"""
-        for item in self.dump_items:
-            if item.name() in self.skip_jobs:
-                item.set_skipped()
-            elif not skipgood or item.status() != "done":
-                item.set_to_run(True)
-
-    def find_item_by_name(self, name):
-        for item in self.dump_items:
-            if item.name() == name:
-                return item
-        return None
-
-    def _get_chunk_to_do(self, job_name):
-        if self._single_job:
-            if self._single_job == job_name:
-                return self._chunk_todo
-        return False
-
-    # read in contents from dump run info file and stuff into dump_items for 
later reference
-    def _set_dump_item_runinfo(self, runinfo):
-        if not runinfo.name():
-            return False
-        for item in self.dump_items:
-            if item.name() == runinfo.name():
-                item.set_status(runinfo.status(), False)
-                item.set_updated(runinfo.updated())
-                item.set_to_run(runinfo.to_run())
-                return True
-        return False
-
-    # write dump run info file
-    # (this file is rewritten with updates after each dumpItem completes)
-    def _report_dump_runinfo_line(self, item):
-        # even if the item has never been run we will at least have "waiting" 
in the status
-        return "name:%s; status:%s; updated:%s" % (item.name(), item.status(), 
item.updated())
-
-
-class Runner(object):
-    def __init__(self, wiki, prefetch=True, spawn=True, job=None, 
skip_jobs=None,
-                 restart=False, notice="", dryrun=False, logging_enabled=False,
-                 chunk_to_do=False, checkpoint_file=None, page_id_range=None,
-                 skipdone=False, verbose=False):
-        self.wiki = wiki
-        self.db_name = wiki.db_name
-        self.prefetch = prefetch
-        self.spawn = spawn
-        self.chunk_info = Chunk(wiki, self.db_name, self.log_and_print)
-        self.restart = restart
-        self.html_notice_file = None
-        self.log = None
-        self.dryrun = dryrun
-        self._chunk_todo = chunk_to_do
-        self.checkpoint_file = checkpoint_file
-        self.page_id_range = page_id_range
-        self.skipdone = skipdone
-        self.verbose = verbose
-
-        if self.checkpoint_file is not None:
-            fname = DumpFilename(self.wiki)
-            fname.new_from_filename(checkpoint_file)
-            # we should get chunk if any
-            if not self._chunk_todo and fname.chunk_int:
-                self._chunk_todo = fname.chunk_int
-            elif self._chunk_todo and fname.chunk_int and self._chunk_todo != 
fname.chunk_int:
-                raise BackupError("specifed chunk to do does not match chunk "
-                                  "of checkpoint file %s to redo", 
self.checkpoint_file)
-            self.checkpoint_file = fname
-
-        self._logging_enabled = logging_enabled
-        self._status_enabled = True
-        self._checksummer_enabled = True
-        self._runinfo_file_enabled = True
-        self._symlinks_enabled = True
-        self._feeds_enabled = True
-        self._notice_file_enabled = True
-        self._makedir_enabled = True
-        self._clean_old_dumps_enabled = True
-        self._cleanup_old_files_enabled = True
-        self._check_for_trunc_files_enabled = True
-
-        if self.dryrun or self._chunk_todo:
-            self._status_enabled = False
-            self._checksummer_enabled = False
-            self._runinfo_file_enabled = False
-            self._symlinks_enabled = False
-            self._feeds_enabled = False
-            self._notice_file_enabled = False
-            self._makedir_enabled = False
-            self._clean_old_dumps_enabled = False
-
-        if self.dryrun:
-            self._logging_enabled = False
-            self._check_for_trunc_files_enabled = False
-            self._cleanup_old_files_enabled = False
-
-        if self.checkpoint_file is not None:
-            self._status_enabled = False
-            self._checksummer_enabled = False
-            self._runinfo_file_enabled = False
-            self._symlinks_enabled = False
-            self._feeds_enabled = False
-            self._notice_file_enabled = False
-            self._makedir_enabled = False
-            self._clean_old_dumps_enabled = False
-
-        if self.page_id_range:
-            self._status_enabled = False
-            self._checksummer_enabled = False
-            self._runinfo_file_enabled = False
-            self._symlinks_enabled = False
-            self._feeds_enabled = False
-            self._notice_file_enabled = False
-            self._makedir_enabled = False
-            self._cleanup_old_files_enabled = True
-
-        self.job_requested = job
-
-        self.skip_jobs = skip_jobs
-        if skip_jobs is None:
-            self.skip_jobs = []
-
-        if self.job_requested == "latestlinks":
-            self._status_enabled = False
-            self._runinfo_file_enabled = False
-
-        if self.job_requested == "createdirs":
-            self._symlinks_enabled = False
-            self._feeds_enabled = False
-
-        if self.job_requested == "latestlinks" or self.job_requested == 
"createdirs":
-            self._checksummer_enabled = False
-            self._notice_file_enabled = False
-            self._makedir_enabled = False
-            self._clean_old_dumps_enabled = False
-            self._cleanup_old_files_enabled = False
-            self._check_for_trunc_files_enabled = False
-
-        if self.job_requested == "noop":
-            self._clean_old_dumps_enabled = False
-            self._cleanup_old_files_enabled = False
-            self._check_for_trunc_files_enabled = False
-
-        self.db_server_info = DbServerInfo(self.wiki, self.db_name, 
self.log_and_print)
-        self.dump_dir = DumpDir(self.wiki, self.db_name)
-
-        # these must come after the dumpdir setup so we know which directory 
we are in
-        if self._logging_enabled and self._makedir_enabled:
-            file_obj = DumpFilename(self.wiki)
-            file_obj.new_from_filename(self.wiki.config.log_file)
-            self.log_filename = self.dump_dir.filename_private_path(file_obj)
-            self.make_dir(os.path.join(self.wiki.private_dir(), 
self.wiki.date))
-            self.log = Logger(self.log_filename)
-            thread.start_new_thread(self.log_queue_reader, (self.log,))
-        self.runinfo_file = RunInfoFile(wiki, self._runinfo_file_enabled,
-                                        self.verbose)
-        self.sym_links = SymLinks(self.wiki, self.dump_dir, self.log_and_print,
-                                  self.debug, self._symlinks_enabled)
-        self.feeds = Feeds(self.wiki, self.dump_dir, self.db_name, self.debug,
-                           self._feeds_enabled)
-        self.html_notice_file = NoticeFile(self.wiki, notice,
-                                           self._notice_file_enabled)
-        self.checksums = Checksummer(self.wiki, self.dump_dir,
-                                     self._checksummer_enabled, self.verbose)
-
-        # some or all of these dump_items will be marked to run
-        self.dump_item_list = DumpItemList(self.wiki, self.prefetch, 
self.spawn,
-                                           self._chunk_todo, 
self.checkpoint_file,
-                                           self.job_requested, self.skip_jobs,
-                                           self.chunk_info, self.page_id_range,
-                                           self.runinfo_file, self.dump_dir)
-        # only send email failure notices for full runs
-        if self.job_requested:
-            email = False
-        else:
-            email = True
-        self.status = Status(self.wiki, self.dump_dir, 
self.dump_item_list.dump_items,
-                             self.checksums, self._status_enabled, email,
-                             self.html_notice_file, self.log_and_print, 
self.verbose)
-
-    def log_queue_reader(self, log):
-        if not log:
-            return
-        done = False
-        while not done:
-            done = log.do_job_on_log_queue()
-
-    def log_and_print(self, message):
-        if hasattr(self, 'log') and self.log and self._logging_enabled:
-            self.log.add_to_log_queue("%s\n" % message)
-        sys.stderr.write("%s\n" % message)
-
-    # returns 0 on success, 1 on error
-    def save_command(self, commands, outfile):
-        """For one pipeline of commands, redirect output to a given file."""
-        commands[-1].extend([">", outfile])
-        series = [commands]
-        if self.dryrun:
-            self.pretty_print_commands([series])
-            return 0
-        else:
-            return self.run_command([series], 
callback_timed=self.status.update_status_files)
-
-    def pretty_print_commands(self, command_series_list):
-        for series in command_series_list:
-            for pipeline in series:
-                command_strings = []
-                for command in pipeline:
-                    command_strings.append(" ".join(command))
-                pipeline_string = " | ".join(command_strings)
-                print "Command to run: ", pipeline_string
-
-    # command series list: list of (commands plus args)
-    # is one pipeline. list of pipelines = 1 series.
-    # this function wants a list of series.
-    # be a list (the command name and the various args)
-    # If the shell option is true, all pipelines will be run under the shell.
-    # callbackinterval: how often we will call callback_timed (in 
milliseconds),
-    # defaults to every 5 secs
-    def run_command(self, command_series_list, callback_stderr=None,
-                    callback_stderr_arg=None, callback_timed=None,
-                    callback_timed_arg=None, shell=False, 
callback_interval=5000):
-        """Nonzero return code from the shell from any command in any pipeline 
will cause
-        this function to print an error message and return 1, indicating error.
-        Returns 0 on success.
-        If a callback function is passed, it will receive lines of
-        output from the call.  If the callback function takes another argument 
(which will
-        be passed before the line of output) must be specified by the arg 
paraemeter.
-        If no callback is provided, and no output file is specified for a given
-        pipe, the output will be written to stderr. (Do we want that?)
-        This function spawns multiple series of pipelines  in parallel.
-
-        """
-        if self.dryrun:
-            self.pretty_print_commands(command_series_list)
-            return 0
-
-        else:
-            commands = CommandsInParallel(command_series_list, 
callback_stderr=callback_stderr,
-                                          
callback_stderr_arg=callback_stderr_arg,
-                                          callback_timed=callback_timed,
-                                          
callback_timed_arg=callback_timed_arg,
-                                          shell=shell, 
callback_interval=callback_interval)
-            commands.run_commands()
-            if commands.exited_successfully():
-                return 0
-            else:
-                problem_commands = commands.commands_with_errors()
-                error_string = "Error from command(s): "
-                for cmd in problem_commands:
-                    error_string = error_string + "%s " % cmd
-                self.log_and_print(error_string)
-                return 1
-
-    def debug(self, stuff):
-        self.log_and_print("%s: %s %s" % (TimeUtils.pretty_time(), 
self.db_name, stuff))
-
-    def run_handle_failure(self):
-        if self.status.fail_count < 1:
-            # Email the site administrator just once per database
-            self.status.report_failure()
-        self.status.fail_count += 1
-
-    def run_update_item_fileinfo(self, item):
-        # this will include checkpoint files if they are enabled.
-        for file_obj in item.list_outfiles_to_publish(self.dump_dir):
-            if exists(self.dump_dir.filename_public_path(file_obj)):
-                # why would the file not exist? because we changed chunk 
numbers in the
-                # middle of a run, and now we list more files for the next 
stage than there
-                # were for earlier ones
-                self.sym_links.save_symlink(file_obj)
-                self.feeds.save_feed(file_obj)
-                self.checksums.checksums(file_obj, self)
-                self.sym_links.cleanup_symlinks()
-                self.feeds.cleanup_feeds()
-
-    def run(self):
-        if self.job_requested:
-            if not self.dump_item_list.old_runinfo_retrieved and 
self.wiki.exists_perdump_index():
-
-                # There was a previous run of all or part of this date, but...
-                # There was no old RunInfo to be had (or an error was 
encountered getting it)
-                # so we can't rerun a step and keep all the status information
-                # about the old run around.
-                # In this case ask the user if they reeeaaally want to go ahead
-                print "No information about the previous run for this date 
could be retrieved."
-                print "This means that the status information about the old 
run will be lost, and"
-                print "only the information about the current (and future) 
runs will be kept."
-                reply = raw_input("Continue anyways? [y/N]: ")
-                if reply not in ["y", "Y"]:
-                    raise RuntimeError("No run information available for 
previous dump, exiting")
-
-            if not self.dump_item_list.mark_dumps_to_run(self.job_requested, 
self.skipdone):
-                # probably no such job
-                sys.stderr.write("No job marked to run, exiting")
-                return None
-            if self.restart:
-                # mark all the following jobs to run as well
-                self.dump_item_list.mark_following_jobs_to_run(self.skipdone)
-        else:
-            self.dump_item_list.mark_all_jobs_to_run(self.skipdone)
-
-        Maintenance.exit_if_in_maintenance_mode(
-            "In maintenance mode, exiting dump of %s" % self.db_name)
-
-        self.make_dir(os.path.join(self.wiki.public_dir(), self.wiki.date))
-        self.make_dir(os.path.join(self.wiki.private_dir(), self.wiki.date))
-
-        self.show_runner_state("Cleaning up old dumps for %s" % self.db_name)
-        self.clean_old_dumps()
-        self.clean_old_dumps(private=True)
-
-        # Informing what kind backup work we are about to do
-        if self.job_requested:
-            if self.restart:
-                self.log_and_print("Preparing for restart from job %s of %s"
-                                   % (self.job_requested, self.db_name))
-            else:
-                self.log_and_print("Preparing for job %s of %s" %
-                                   (self.job_requested, self.db_name))
-        else:
-            self.show_runner_state("Starting backup of %s" % self.db_name)
-
-        self.checksums.prepare_checksums()
-
-        for item in self.dump_item_list.dump_items:
-            Maintenance.exit_if_in_maintenance_mode(
-                "In maintenance mode, exiting dump of %s at step %s"
-                % (self.db_name, item.name()))
-            if item.to_run():
-                item.start()
-                self.status.update_status_files()
-                self.runinfo_file.save_dump_runinfo_file(
-                    self.dump_item_list.report_dump_runinfo())
-                try:
-                    item.dump(self)
-                except Exception, ex:
-                    exc_type, exc_value, exc_traceback = sys.exc_info()
-                    if self.verbose:
-                        sys.stderr.write(repr(traceback.format_exception(
-                            exc_type, exc_value, exc_traceback)))
-                    else:
-                        if exc_type.__name__ == 'BackupPrereqError':
-                            self.debug(str(ex))
-                        else:
-                            self.debug("*** exception! " + str(ex))
-                        if exc_type.__name__ != 'BackupPrereqError':
-                            item.set_status("failed")
-
-            if item.status() == "done":
-                self.checksums.cp_chksum_tmpfiles_to_permfile()
-                self.run_update_item_fileinfo(item)
-            elif item.status() == "waiting" or item.status() == "skipped":
-                # don't update the checksum files for this item.
-                continue
-            else:
-                # Here for example status is "failed". But maybe also
-                # "in-progress", if an item chooses to override dump(...) and
-                # forgets to set the status. This is a failure as well.
-                self.run_handle_failure()
-
-        # special case
-        if self.job_requested == "createdirs":
-            if not os.path.exists(os.path.join(self.wiki.public_dir(), 
self.wiki.date)):
-                os.makedirs(os.path.join(self.wiki.public_dir(), 
self.wiki.date))
-            if not os.path.exists(os.path.join(self.wiki.private_dir(), 
self.wiki.date)):
-                os.makedirs(os.path.join(self.wiki.private_dir(), 
self.wiki.date))
-
-        if self.dump_item_list.all_possible_jobs_done():
-            # All jobs are either in status "done", "waiting", "failed", 
"skipped"
-            self.status.update_status_files("done")
-        else:
-            # This may happen if we start a dump now and abort before all 
items are
-            # done. Then some are left for example in state "waiting". When
-            # afterwards running a specific job, all (but one) of the jobs
-            # previously in "waiting" are still in status "waiting"
-            self.status.update_status_files("partialdone")
-
-        
self.runinfo_file.save_dump_runinfo_file(self.dump_item_list.report_dump_runinfo())
-
-        # if any job succeeds we might as well make the sym link
-        if self.status.fail_count < 1:
-            self.complete_dump()
-
-        if self.job_requested:
-            # special case...
-            if self.job_requested == "latestlinks":
-                if self.dump_item_list.all_possible_jobs_done():
-                    
self.sym_links.remove_symlinks_from_old_runs(self.wiki.date)
-                    self.feeds.cleanup_feeds()
-
-        # Informing about completion
-        if self.job_requested:
-            if self.restart:
-                self.show_runner_state("Completed run restarting from job %s 
for %s"
-                                       % (self.job_requested, self.db_name))
-            else:
-                self.show_runner_state("Completed job %s for %s"
-                                       % (self.job_requested, self.db_name))
-        else:
-            self.show_runner_state_complete()
-
-        # let caller know if this was a successful run
-        if self.status.fail_count > 0:
-            return False
-        else:
-            return True
-
-    def clean_old_dumps(self, private=False):
-        """Removes all but the wiki.config.keep last dumps of this wiki.
-        If there is already a directory for todays dump, this is omitted in 
counting
-        and not removed."""
-        if self._clean_old_dumps_enabled:
-            if private:
-                old = self.wiki.dump_dirs(private=True)
-                dumptype = 'private'
-            else:
-                old = self.wiki.dump_dirs()
-                dumptype = 'public'
-            if old:
-                if old[-1] == self.wiki.date:
-                    # If we're re-running today's (or jobs from a given day's) 
dump, don't count
-                    # it as one of the old dumps to keep... or delete it 
halfway through!
-                    old = old[:-1]
-                if self.wiki.config.keep > 0:
-                    # Keep the last few
-                    old = old[:-(self.wiki.config.keep)]
-            if old:
-                for dump in old:
-                    self.show_runner_state("Purging old %s dump %s for %s" %
-                                           (dumptype, dump, self.db_name))
-                    if private:
-                        base = os.path.join(self.wiki.private_dir(), dump)
-                    else:
-                        base = os.path.join(self.wiki.public_dir(), dump)
-                    shutil.rmtree("%s" % base)
-            else:
-                self.show_runner_state("No old %s dumps to purge." % dumptype)
-
-    def show_runner_state(self, message):
-        self.debug(message)
-
-    def show_runner_state_complete(self):
-        self.debug("SUCCESS: done.")
-
-    def complete_dump(self):
-        # note that it's possible for links in "latest" to point to
-        # files from different runs, in which case the checksum files
-        # will have accurate checksums for the run for which it was
-        # produced, but not the other files. FIXME
-        self.checksums.move_chksumfiles_into_place()
-        for htype in self.checksums.hashtypes:
-            dumpfile = DumpFilename(
-                self.wiki, None, 
self.checksums.get_checksum_filename_basename(htype))
-            self.sym_links.save_symlink(dumpfile)
-        self.sym_links.cleanup_symlinks()
-
-        for item in self.dump_item_list.dump_items:
-            if item.to_run():
-                dump_names = item.list_dumpnames()
-                if type(dump_names).__name__ != 'list':
-                    dump_names = [dump_names]
-
-                if item._chunks_enabled:
-                    # if there is a specific chunk, we want to only clear out
-                    # old files for that piece, because new files for the other
-                    # pieces may not have been generated yet.
-                    chunk = item._chunk_todo
-                else:
-                    chunk = None
-
-                checkpoint = None
-                if item._checkpoints_enabled:
-                    if item.checkpoint_file is not None:
-                        # if there's a specific checkpoint file we are
-                        # rerunning, we would only clear out old copies
-                        # of that very file. meh. how likely is it that we
-                        # have one? these files are time based and the 
start/end pageids
-                        # are going to fluctuate. whatever
-                        checkpoint = item.checkpoint_file.checkpoint
-
-                for dump in dump_names:
-                    self.sym_links.remove_symlinks_from_old_runs(
-                        self.wiki.date, dump, chunk, checkpoint, 
onlychunks=item.onlychunks)
-
-                self.feeds.cleanup_feeds()
-
-    def make_dir(self, dirname):
-        if self._makedir_enabled:
-            if exists(dirname):
-                self.debug("Checkdir dir %s ..." % dirname)
-            else:
-                self.debug("Creating %s ..." % dirname)
-                os.makedirs(dirname)
+from dumps.jobs import DumpDir, DumpFilename
+from dumps.runner import Runner, DumpItemList
+from dumps.utils import RunInfoFile, Chunk
 
 
 def check_jobs(wiki, date, job, skipjobs, page_id_range, chunk_to_do,

-- 
To view, visit https://gerrit.wikimedia.org/r/250433
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I313ca05d667bf0df9eb7a73022ec37aca6106e9b
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: ariel
Gerrit-Owner: ArielGlenn <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to