ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/78043


Change subject: puppetize and enable production xml dumps rsync to gluster 
public labs share
......................................................................

puppetize and enable production xml dumps rsync to gluster public labs share

Change-Id: I7fc957a6cc1042a937b6eb89ff0dbc44847ba19b
---
A files/mirror/gluster-rsync-cron.sh
A files/mirror/wmfdumpsmirror.py
M manifests/misc/download.pp
M manifests/site.pp
4 files changed, 814 insertions(+), 14 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/43/78043/1

diff --git a/files/mirror/gluster-rsync-cron.sh 
b/files/mirror/gluster-rsync-cron.sh
new file mode 100644
index 0000000..7fb8283
--- /dev/null
+++ b/files/mirror/gluster-rsync-cron.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+running=`pgrep -u root -f   'python /root/wmfdumpsmirror.py --remotedir 
/data/xmldatadumps/public'`
+if [ -z "$running" ]; then
+    python /usr/local/bin/wmfdumpsmirror.py --remotedir 
/data/xmldatadumps/public --localdir /mnt/glusterpublicdata/public 
--filesperjob 50 --sizeperjob 5G --workercount 3 --rsynclist 
rsync-list.txt.rsync
+fi
+running=`pgrep -u root -f -x  '/usr/bin/rsync -a 
/data/xmldatadumps/public/other/incr /mnt/glusterpublicdata/public/other/'`
+if [ -z "$running" ]; then
+    /usr/bin/rsync -a /data/xmldatadumps/public/other/incr 
/mnt/glusterpublicdata/public/other/
+fi
diff --git a/files/mirror/wmfdumpsmirror.py b/files/mirror/wmfdumpsmirror.py
new file mode 100644
index 0000000..9c30d4d
--- /dev/null
+++ b/files/mirror/wmfdumpsmirror.py
@@ -0,0 +1,758 @@
+import getopt
+import os
+import re
+import sys
+import subprocess
+import shutil
+import multiprocessing
+from subprocess import Popen, PIPE
+from Queue import Empty
+
+class Job(object):
+    def __init__(self, jobId, jobContents):
+        self.jobId = jobId # this must be unique across all jobs
+        self.contents = jobContents
+        self.done = False
+        self.failed = False
+
+    def markDone(self):
+        self.done = True
+
+    def markFailed(self):
+        self.failed = True
+
+    def checkIfDone(self):
+        return self.done
+
+    def checkIfFailed(self):
+        return self.failed
+
+class RsyncJob(Job):
+    datePattern = re.compile('^20[0-9]{6}$')
+    def __init__(self, contents):
+        super( RsyncJob, self ).__init__(contents[0], contents)
+        self.rsyncedByJob = self.getDirsPerProjectRsyncedByJob()
+
+    # things that get here should look like:
+    # aawikibooks/20120317/aawikibooks-20120317-all-titles-in-ns0.gz
+    def _getPathComponentsFromFileName(self, path):
+        if not os.sep in path:
+            raise MirrorError("bad line encuntered in rsync directory list: 
'%s'" % path)
+
+        components = path.split(os.sep)
+        if len(components) < 3 or not 
RsyncJob.datePattern.search(components[-2]):
+            raise MirrorError("what garbage is this: %s in the filenames for 
rsync? " % path)
+        return components
+
+    def getDirsPerProjectRsyncedByJob(self):
+        """return has of projects which are partially or completely
+        rsynced by this job, each has key having as value the dirs that
+        are rsynced"""
+
+        projects = {}
+        for line in self.contents:
+            if not os.sep in line:
+                # files that aren't part of the project dumps but
+                # are included in the rsync... for example various
+                # html files that might be at the top of the tree;
+                # don't dig through their names looking for project dump info
+                continue
+            components = self._getPathComponentsFromFileName(line)
+            if len(components):
+                project = os.sep + components[-3]
+                projectSubdir = components[-2]
+                projectFile = components[-1]
+                if project not in projects.keys():
+                    projects[project] = {}
+                if projectSubdir not in projects[project]:
+                    projects[project][projectSubdir] = []
+                projects[project][projectSubdir].append(projectFile)
+            
+        return projects
+
+class RsyncFilesProcessor(object):
+    # for now we have the file list be a flat file, sometime in the
+    # not to distant future it will be maybe a stream cause we'll be
+    # feeding a list from the api, that will be sketchy
+    def __init__(self, fileListFd, maxFilesPerJob, maxDuPerJob, workerCount, 
rsyncRemotePath, localPath, rsyncArgs, verbose, dryrun):
+        self.fileListFd = fileListFd
+        self.maxFilesPerJob = maxFilesPerJob
+        self.maxDuPerJob = maxDuPerJob
+        self.verbose = verbose
+        self.dryrun = dryrun
+        self.rsyncArgs = rsyncArgs
+        self.localPath = localPath
+        self.rsyncer = Rsyncer(rsyncRemotePath, localPath, self.rsyncArgs, 
self.verbose, self.dryrun)
+        self.jQ = JobQueue(workerCount, self.rsyncer, self.verbose, 
self.dryrun)
+        self.datePattern = re.compile('^20[0-9]{6}$')
+        self.jobsPerProject = {}
+        self.jobs = {}
+        self.deleter = DirDeleter(self.jobsPerProject, self.localPath, 
self.verbose, self.dryrun)
+
+    def _getFileSize(self, line):
+        return int(line.split()[1])
+        
+    def _getPath(self, line):
+        return line.split()[4]
+
+    def _checkLineWanted(self, line):
+        """is this a line we want, it has information about a
+        file for our jobs? if so return true, if not return
+        false.  we assume lines starting with '#' are comments,
+        blank lines are to be skipped, and we don't want
+        directory entries, only files and/or symlinks"""
+        if not line or line[0] == 'd' or line[0] == '#':
+            return False
+        else:
+            return True
+
+    def _getFileName(self, line):
+
+        # the input consists of a list of filenames plus other info and we 
+        # can expect the dumps of one project to be listed in consecutive 
+        # lines rather than scattered about in the file (which is of no 
+        # concern for us but is good for rsync)
+        # it's produced by rsync --list-only...
+
+        # example:
+
+        # drwxrwxr-x        4096 2012/03/17 13:23:04 aawikibooks
+        # drwxr-xr-x        4096 2012/03/17 13:24:10 aawikibooks/20120317
+        # -rw-r--r--          39 2012/03/17 13:23:54 
aawikibooks/20120317/aawikibooks-20120317-all-titles-in-ns0.gz
+        # -rw-r--r--         760 2012/03/17 13:23:39 
aawikibooks/20120317/aawikibooks-20120317-category.sql.gz
+        # -rw-r--r--         826 2012/03/17 13:23:23 
aawikibooks/20120317/aawikibooks-20120317-categorylinks.sql.gz
+        # -rw-r--r--        1513 2012/03/17 13:23:30 
aawikibooks/20120317/aawikibooks-20120317-externallinks.sql.gz
+
+        # we may also have a few files in the top level directory that
+        # we want the mirrors to pick up (text or html files of particular 
interest)
+
+        # note that the directories are also listed, we want to skip those
+        # we'll allow commnts in there in case some other script produces the 
files
+        # or humans edit them; skip those and empty lines, the rest should be 
good data
+        path = self._getPath(line)
+        if not os.sep in path:
+            return line
+        else:
+            return line.split(os.sep)[-1]
+
+    def stuffJobsOnQueue(self):
+        fileCount = 0
+        fileDu = 0
+        files = []
+        line = self.fileListFd.readline().rstrip()
+        while line:
+            if not self._checkLineWanted(line):
+                line = self.fileListFd.readline().rstrip()
+                continue
+            path = self._getPath(line)
+            if path:
+                fileCount = fileCount + 1
+                fileDu = fileDu + self._getFileSize(line)
+                files.append(path)
+                if fileDu >= self.maxDuPerJob or fileCount >= 
self.maxFilesPerJob:
+                    job = self.makeJob(files)
+                    if self.dryrun or self.verbose:
+                        MirrorMsg.display("adding job %s (size %d and 
filecount %d) to queue\n" % (job.jobId, fileDu, fileCount))
+                    self.jQ.addToJobQueue(job)
+                    fileDu = 0
+                    fileCount = 0
+                    files = []
+            line = self.fileListFd.readline().rstrip()
+
+        if fileCount:
+            if self.dryrun or self.verbose:
+                MirrorMsg.display("adding job %s (size %d and filecount %d) to 
queue\n" % (job.jobId, fileDu, fileCount))
+            self.jQ.addToJobQueue(self.makeJob(files))
+
+        self.jQ.setEndOfJobs()
+        self.deleter.setJobList(self.jobs)
+
+    def makeJob(self, files):
+        job = RsyncJob(files)
+        for project in job.rsyncedByJob.keys():
+            if project not in self.jobsPerProject.keys():
+                self.jobsPerProject[project] = []
+            self.jobsPerProject[project].append(job.jobId)
+        self.jobs[job.jobId] = job
+        return job
+
+    def doPostJobProcessing(self, skipDeletes):
+        while True:
+            # any completed jobs?
+            job = self.jQ.getJobFromNotifyQueue()
+            # no more jobs and mo more workers. 
+            if not job:
+                if not self.jQ.getActiveWorkerCount():
+                    if self.dryrun or self.verbose:
+                        MirrorMsg.display( "no jobs left and no active 
workers\n")
+                    break
+                else:
+                    continue
+            if self.dryrun:
+                MirrorMsg.display("jobId %s would have been completed\n" % 
job.jobId)
+            elif self.verbose:
+                MirrorMsg.display("jobId %s completed\n" % job.jobId)
+
+            # update status of job in our todo queue
+            j = self.jobs[job.jobId]
+            if job.checkIfDone():
+                j.markDone()
+            if job.checkIfFailed():
+                j.markFailed()
+
+            if not skipDeletes:
+                if self.verbose or self.dryrun:
+                    MirrorMsg.display("checking post-job deletions\n")
+                self.deleter.checkAndDoDeletes(j)
+
+class DirDeleter(object):
+    """remove all dirs for the project that are not in the 
+    list of dirs to rsync, we don't want them any more"""
+    def __init__(self, jobsPerProject, localPath, verbose, dryrun):
+        self.jobsPerProject = jobsPerProject
+        self.localPath = localPath
+        self.verbose = verbose
+        self.dryrun = dryrun
+
+    def getFullLocalPath(self, relPath):
+        if relPath.startswith(os.sep):
+            relPath = relPath[len(os.sep):]
+        return(os.path.join(self.localPath, relPath))
+
+    def setJobList(self, jobList):
+        self.jobList = jobList
+
+    def checkAndDoDeletes(self, job):
+        """given a file list, we need to see if we are done with
+        one project and on to the next, which things we rsynced and
+        which not, and delete the ones not (i.e. left over from previous
+        run and we don't want them now); failed rsyncs may not have
+        completed normally so we won't do deletions for a project
+        with failed jobs"""
+        for project in job.rsyncedByJob.keys():
+            ids = [ self.jobList[jobId] for jobId in 
self.jobsPerProject[project] if not self.jobList[jobId].checkIfDone() or 
self.jobList[jobId].checkIfFailed() ]
+            if not len(ids):
+                if self.dryrun:
+                    MirrorMsg.display("Would do deletes for project %s\n" % 
project)
+                elif self.verbose:
+                    MirrorMsg.display("Doing deletes for project %s\n" % 
project)
+                self.doDeletes(project)
+            else:
+                if self.verbose:
+                    MirrorMsg.display("No deletes for project %s\n" % project)
+                    
+    def getListOfDirsRsyncedForProject(self, project):
+        """get directories we synced for this project, 
+        across all jobs"""
+        dirsForProject = []
+        for jobId in self.jobsPerProject[project]:
+            dirsForProject.extend([ k for k in 
self.jobList[jobId].rsyncedByJob[project].keys() if not k in dirsForProject ])
+        return dirsForProject
+
+    def getListOfFilesRsyncedForDirOfProject(self, project, dirName):
+        """get files we synced for a specific dir for
+        this project, across all jobs"""
+        filesForDirInProject = []
+        for jobId in self.jobsPerProject[project]:
+            if dirName in self.jobList[jobId].rsyncedByJob[project].keys():
+                
filesForDirInProject.extend(self.jobList[jobId].rsyncedByJob[project][dirName])
+        return filesForDirInProject
+
+    def doDeletes(self, project):
+        # fixme a sanity check here would be nice before we just remove stuff
+
+        # find which dirs were rsynced for this project,
+        # remove the ones we didn't as we no longer want them
+        projectDirsRsynced = self.getListOfDirsRsyncedForProject(project)
+
+        if not os.path.exists(self.getFullLocalPath(project)):
+            return
+        dirs = os.listdir(self.getFullLocalPath(project))
+
+        if self.dryrun or self.verbose:
+            MirrorMsg.display("for project %s:" % project)
+        if self.dryrun:
+            MirrorMsg.display("would delete (dirs): ", True)
+        elif self.verbose:
+            MirrorMsg.display("deleting (dirs): ", True)
+
+        if not len(dirs):
+            if self.dryrun or self.verbose:
+                MirrorMsg.display("None", True)
+
+        for d in dirs:
+            if not d in projectDirsRsynced:
+                dirName = os.path.join(project, d)
+                if self.dryrun or self.verbose:
+                    MirrorMsg.display( "'%s'" % dirName , True)
+                if not self.dryrun:
+                    try:
+                        shutil.rmtree(self.getFullLocalPath(dirName))
+                    except:
+                        MirrorMsg.warn("failed to remove directory or contents 
of %s\n" % self.getFullLocalPath(dirName))
+                        pass
+        if self.dryrun or self.verbose:
+            MirrorMsg.display('\n', True)
+
+        # now for the dirs we did rsync, check the files existing now
+        # against the files that we rsynced, and remove the extraneous ones
+        if self.dryrun or self.verbose:
+            MirrorMsg.display("for project %s:" % project)
+        if self.dryrun:
+            MirrorMsg.display("would delete (files): ", True)
+        elif self.verbose:
+            MirrorMsg.display("deleting (files): ", True)
+
+        for d in dirs:
+            if d in projectDirsRsynced:
+                filesExisting = 
os.listdir(self.getFullLocalPath(os.path.join(project, d)))
+                filesRsynced = 
self.getListOfFilesRsyncedForDirOfProject(project, d)
+                filesToToss = [ f for f in filesExisting if not f in 
filesRsynced ]
+
+                if self.dryrun or self.verbose:
+                    MirrorMsg.display( "for directory "+ d, True)
+                    if not len(filesToToss):
+                        MirrorMsg.display("None", True)
+                for f in filesToToss:
+                    fileName = self.getFullLocalPath(os.path.join(project, d, 
f))
+                    if os.path.isdir(fileName):
+                            continue
+                    if self.dryrun or self.verbose:
+                        # we should never be pushing directories across as 
part of the rsync. 
+                        # so if we have a local directory, leave it alone
+                        MirrorMsg.display( "'%s'" % f , True)
+                    if not self.dryrun:
+                        try:
+                            os.unlink(fileName)
+                        except:
+                            MirrorMsg.warn("failed to unlink file %s\n" % 
fileName)
+                            pass
+        if self.dryrun or self.verbose:
+            MirrorMsg.display('\n', True)
+
+class JobHandler(object):
+    def init(self):
+        """this should be overriden to set and args
+        that you need to actually process a job"""
+        pass
+    
+    def doJob(self, contents):
+        """override this with a function that processes
+        contents as desired"""
+        print contents
+        return False
+
+class Rsyncer(JobHandler):
+    """all the info about rsync you ever wanted to know but were afraid to 
ask..."""
+    def __init__(self, rsyncRemotePath, localPath, rsyncArgs, verbose, dryrun):
+        self.rsyncRemotePath = rsyncRemotePath
+        self.localPath = localPath
+        self.rsyncArgs = rsyncArgs
+        self.verbose = verbose
+        self.dryrun = dryrun
+        self.cmd = Command(verbose, dryrun)
+
+    def doJob(self, contents):
+        return self.doRsync(contents)
+
+    def doRsync(self, files):
+        command = [ "/usr/bin/rsync" ]
+        command.extend([ "--files-from", "-" ])
+        command.extend( self.rsyncArgs )
+        command.extend([ self.rsyncRemotePath,  self.localPath ])
+
+        if self.dryrun or self.verbose:
+            commandString = " ".join(command)
+        if self.dryrun:
+            MirrorMsg.display("would run %s" % commandString)
+        elif self.verbose:
+            MirrorMsg.display("running %s" % commandString)
+        if self.dryrun or self.verbose:
+            MirrorMsg.display("with input:\n" + '\n'.join(files) + '\n', True)
+        return self.cmd.runCommand(command, shell = False, inputText = 
'\n'.join(files) + '\n')
+
+class JobQueueHandler(multiprocessing.Process):
+    def __init__(self, jQ, handler, verbose, dryrun):
+        multiprocessing.Process.__init__(self)
+        self.jQ = jQ
+        self.handler = handler
+        self.verbose = verbose
+        self.dryrun = dryrun
+
+    def run(self):
+        while True:
+            job = self.jQ.getJobOnQueue()
+            if not job: # no jobs left, we're done
+                break
+            self.doJob(job)
+
+    def doJob(self, job):
+        result = self.handler.doJob(job.contents)
+        if result:
+            job.markFailed()
+        else:
+            job.markDone()
+        self.jQ.notifyJobDone(job)
+
+class JobQueue(object):
+    def __init__(self, initialWorkerCount, handler, verbose, dryrun):
+        """create queue for jobs, plus specified
+        number of workers to read from the queue"""
+        self.handler = handler
+        self.verbose = verbose
+        self.dryrun = dryrun
+        # queue of jobs to be done (all the info needed, plus job id)
+        self.todoQueue = multiprocessing.Queue()
+
+        # queue to which workers write job ids of completed jobs
+        self.notifyQueue = multiprocessing.Queue()
+
+        # this 'job' on the queue means there are no more
+        # jobs. we put on of these on queue for each worker
+        self.endOfJobs = None
+
+        self._initialWorkerCount = workerCount
+        self._activeWorkers= []
+        if not self._initialWorkerCount:
+            self._initialWorkerCount = 1
+        if self.verbose or self.dryrun:
+            MirrorMsg.display( "about to start up %d workers:" % 
self._initialWorkerCount )
+        for i in xrange(0, self._initialWorkerCount):
+            w = JobQueueHandler(self, self.handler, self.verbose, self.dryrun)
+            w.start()
+            self._activeWorkers.append(w)
+            if self.verbose or self.dryrun:
+                MirrorMsg.display( '.', True)
+        if self.verbose or self.dryrun:
+            MirrorMsg.display( "done\n", True)
+
+    def getJobOnQueue(self):
+        # after 5 minutes of waiting around we decide that
+        # no one is ever going to put stuff on the queue
+        # again.  either the main process is done filling
+        # the queue or it died or hung
+
+        try:
+            job = self.todoQueue.get(timeout = 60)
+        except Empty: 
+            if self.verbose or self.dryrun:
+                MirrorMsg.display( "job todo queue was empty\n" )
+            return False
+
+        if (job == self.endOfJobs):
+            if self.verbose or self.dryrun:
+                MirrorMsg.display( "found jobs done marker on jobs queue\n" )
+            return False
+        else:
+            if self.verbose or self.dryrun:
+                MirrorMsg.display("retrieved from the job queue: %s\n" % 
job.jobId)
+            return job
+            
+    def notifyJobDone(self, job):
+        self.notifyQueue.put_nowait(job)
+
+    def addToJobQueue(self,job=None):
+        if (job):
+            self.todoQueue.put_nowait(job)
+
+    def setEndOfJobs(self):
+        """stuff 'None' on the queue, so that when
+        a worker reads this, it will clean up and exit"""
+        for i in xrange(0,self._initialWorkerCount):
+            self.todoQueue.put_nowait(self.endOfJobs)
+
+    def getJobFromNotifyQueue(self):
+        """see if any job has been put on
+        the notify queue (meaning that it has
+        been completed)"""
+        jobDone = False
+        # wait up to one minute.  after that we're pretty sure
+        # that if there are no active workers there are no more
+        # jobs that are going to get done either.
+        try:
+            jobDone = self.notifyQueue.get(timeout = 60)
+        except Empty:
+            if not self.getActiveWorkerCount():
+                return False
+        return jobDone
+
+    def getActiveWorkerCount(self):
+        self._activeWorkers = [ w for w in self._activeWorkers if w.is_alive() 
]
+        return len(self._activeWorkers)
+
+class Command(object):
+    def __init__(self, verbose, dryrun):
+        self.dryrun = dryrun
+        self.verbose = verbose
+
+    def runCommand(self, command, shell=False, inputText=False):
+        """Run a command, expecting no output. Raises MirrorError on 
+        non-zero return code."""
+
+        if type(command).__name__=="list":
+            commandString = " ".join(command)
+        else:
+            commandString = command
+        if (self.dryrun or self.verbose):
+            if self.dryrun:
+                MirrorMsg.display("would run %s\n" % commandString)
+                return
+            if self.verbose:
+                MirrorMsg.display("about to run %s\n" % commandString)
+
+        if inputText:
+            proc = Popen(command, shell = shell, stderr = PIPE, stdin = PIPE)
+        else:
+            proc = Popen(command, shell = shell, stderr = PIPE)
+
+        output, error = proc.communicate(inputText)
+        if output:
+            print output
+
+        if proc.returncode:
+            MirrorMsg.warn("command '%s failed with return code %s and error 
%s\n"
+                              % ( commandString, proc.returncode,  error ) )
+
+        # let the caller decide whether to bail or not
+        return proc.returncode
+
+class MirrorError(Exception):
+    pass
+
+class MirrorMsg(object):
+    def warn(message):
+        # maybe this should go to stderr. eh for now...
+        print "Warning:", os.getpid(), message
+        sys.stdout.flush()
+
+    def display(message, continuation = False):
+        # caller must add newlines to messages as desired
+        if continuation:
+            print message,
+        else:
+            print "Info: (%d) %s" % (os.getpid(), message),
+        sys.stdout.flush()
+
+    warn = staticmethod(warn)
+    display = staticmethod(display)
+
+class Mirror(object):
+    """reading directories for rsync from a specified file,
+    rsync each one; remove directories locally that aren't in the file"""
+
+    def __init__(self, hostName, remoteDirName, localDirName, rsyncList, 
rsyncArgs, maxFilesPerJob, maxDuPerJob, workerCount, skipDeletes, verbose, 
dryrun):
+        self.hostName = hostName
+        self.remoteDirName = remoteDirName
+        self.localDirName = localDirName
+        if self.hostName:
+            self.rsyncRemoteRoot = self.hostName + "::" + self.remoteDirName
+        else:
+            # the 'remote' dir is actually on the local host and we are
+            # rsyncing from one locally mounted filesystem to another
+            self.rsyncRemoteRoot = self.remoteDirName
+        self.rsyncFileList = rsyncList
+        self.rsyncArgs = rsyncArgs
+        self.verbose = verbose
+        self.dryrun = dryrun
+        self.maxFilesPerJob = maxFilesPerJob
+        self.maxDuPerJob = maxDuPerJob
+        self.workerCount = workerCount
+        self.skipDeletes = skipDeletes
+
+    def getFullLocalPath(self, relPath):
+        if relPath.startswith(os.sep):
+            relPath = relPath[len(os.sep):]
+        return(os.path.join(self.localDirName,relPath))
+
+    def getRsyncFileListing(self):
+        """via rsync, get full list of files for rsync from remote host"""
+        command = [ "/usr/bin/rsync", "-tp", self.rsyncRemoteRoot + '/' + 
self.rsyncFileList,  self.localDirName ]
+        # here we don't do a dry run, we will actually retrieve
+        # the list (because otherwise the rest of the run
+        # won't produce any information about what the run
+        # would do).  we will turn on verbosity though if
+        # dryrun was set
+        cmd = Command(self.verbose or self.dryrun, False)
+        result = cmd.runCommand(command, shell = False)
+        if result:
+            raise MirrorError("Failed to get list of files for rsync\n")
+
+    def processRsyncFileList(self):
+        f = open(self.getFullLocalPath(self.rsyncFileList))
+        if not f:
+            raise MirrorError("failed to open list of files for rsync", 
os.path.join(self.localDirName,self.rsyncFileList))
+        self.filesProcessor = RsyncFilesProcessor(f, self.maxFilesPerJob, 
self.maxDuPerJob, self.workerCount, self.rsyncRemoteRoot, self.localDirName, 
self.rsyncArgs, self.verbose, self.dryrun)
+        # create all jobs and put on todo queue
+        self.filesProcessor.stuffJobsOnQueue()
+        f.close()
+
+        # watch jobs get done and do post job cleanup after each one
+        if self.verbose or self.dryrun:
+            MirrorMsg.display("waiting for workers to process jobs\n")
+        self.filesProcessor.doPostJobProcessing(self.skipDeletes)
+
+    def setupDir(self,dirName):
+        if self.dryrun:
+           return
+ 
+        if os.path.exists(dirName):
+            if not os.path.isdir(dirName):
+                raise MirrorError("target directory name %s is not a 
directory, giving up" % dirName)
+        else:
+            os.makedirs(dirName)
+
+def usage(message = None):
+    if message:
+        print message
+        print "Usage: python wmfdumpsmirror.py [--hostname dumpserver] 
-remotedir dirpath"
+        print "              --localdir dirpath [--rsyncargs args] 
[--rsynclist filename]"
+        print "              [--filesperjob] [--sizeperjob] [--workercount] 
[--dryrun]"
+        print "              [--skipdeletes] [--verbose]"
+        print ""
+        print "This script does a continuous rsync from specified XML dumps 
rsync server,"
+        print "rsyncing the last N good dumps of each project and cleaning up 
old files."
+        print "The rsync is done on a list of files, not directories; bear 
this in mind"
+        print "when using the --rsyncargs option below.  The list of files 
should have"
+        print "been produced by rsync --list-only or be in the same format."
+        print ""
+        print "--hostname:     the name of the dump rsync server to contact"
+        print "                if this is left blank, the copy will be done 
from one path"
+        print "                to another on the local host"
+        print "--remotedir:   the remote path to the top of the dump directory 
tree"
+        print "                containing the mirror"
+        print "--localdir:     the full path to the top of the local directory 
tree"
+        print "                containing the mirror"
+        print "--rsyncargs:    arguments to be passed through to rsync, 
comma-separated,"
+        print "                with 'arg=value' for arguments that require a 
value"
+        print "                example:  --rsyncargs -tp,--bandwidth=10000"
+        print "                default: '-aq'"
+        print "--rsynclist:    the name of the list of dumps for rsync"
+        print "                default: rsync-list.txt.rsync"
+        print " --filesperjob: the maximum number of files to pass to a worker 
to process"
+        print "                at once"
+        print "                default: 1000"
+        print " --sizeperjob:  the maximum size of a batch of files to pass to 
a worker"
+        print "                to process at once (may be specified in K/M/G 
i.e. "
+        print "                kilobytes/megabytes/gigabytes; default is K) to 
a worker"
+        print "                to process at once"
+        print "                default: 500M"
+        print " --workercount: the number of worker processes to do 
simultaneous rsyncs"
+        print "                default: 1"
+        print " --dryrun:      don't do the rsync of files, just get the rsync 
file list"
+        print "                and print out what would be done"
+        print " --skipdeletes: copy or update files but don't delete anything"
+        print " --verbose:     print lots of diagnostic output"
+        print ""
+        print "Example: python wmfdumpsmirror.py --hostname 
dumps.wikimedia.org \\"
+        print "                --localdir /opt/data/dumps --rsyncfile 
rsync-list.txt.rsync"
+        sys.exit(1)
+
+def getSizeInBytes(value):
+    # expect digits optionally followed by one of 
+    # K M G; if not, then we assume K
+    sizePattern = re.compile('^([0-9]+)([K|M|G])?$')
+    result = sizePattern.search(value)
+    if not result:
+        usage("sizeperjob must be a positive integer optionally followed by 
one of 'K', 'M', 'G'")
+    size = int(result.group(1))
+    multiplier = result.group(2)
+    if multiplier == 'K' or multiplier == '':
+        size = size * 1000
+    elif multiplier == 'M':
+        size = size * 1000000
+    elif multiplier == 'G':
+        size = size * 1000000000
+    return size
+
+def getRsyncArgs(value):
+    # someday we should really check to make sure that
+    # args here make sense.  for now we shuck that job
+    # off to the user :-P
+    if not value:
+        return None
+    if ',' not in value:
+        return [ value ]
+    else:
+        return value.split(',')
+
+if __name__ == "__main__":
+    hostName = None
+    localDir = None
+    remoteDir = None
+    rsyncList = None
+    rsyncArgs = None
+    maxFilesPerJob = None
+    maxDuPerJob = None
+    workerCount = None
+    dryrun = False
+    skipDeletes = False
+    verbose = False
+    
+    try:
+        (options, remainder) = getopt.gnu_getopt(sys.argv[1:], "", 
["hostname=", "localdir=", "remotedir=", "rsynclist=",
+                          "rsyncargs=", "filesperjob=", "sizeperjob=", 
"workercount=", "dryrun", "skipdeletes", "verbose" ])
+    except:
+        usage("Unknown option specified")
+
+    for (opt, val) in options:
+        if opt == "--dryrun":
+            dryrun = True
+        elif opt == "--filesperjob":
+            if not val.isdigit():
+                usage("filesperjob must be a positive integer")
+            maxFilesPerJob = int(val)
+        elif opt == "--hostname":
+            hostName = val
+        elif opt == "--localdir":
+            localDir = val
+        elif opt == "--remotedir":
+            remoteDir = val
+        elif opt == "--rsynclist":
+            rsyncList = val
+        elif opt == "--rsyncargs":
+            rsyncArgs = getRsyncArgs(val)
+        elif opt == "--sizeperjob":
+            maxDuPerJob = getSizeInBytes(val)
+        elif opt == "--skipdeletes":
+            skipDeletes = True
+        elif opt == "--verbose":
+            verbose = True
+        elif opt == "--workercount":
+            if not val.isdigit():
+                usage("workercount must be a positive integer")
+            workerCount = int(val)
+
+    if len(remainder) > 0:
+        usage("Unknown option specified")
+
+    if not remoteDir or not localDir:
+        usage("Missing required option")
+
+    if not os.path.isdir(localDir):
+        usage("local rsync directory",localDir,"does not exist or is not a 
directory")
+
+    if not rsyncList:
+        rsyncList = "rsync-list.txt.rsync"
+
+    if not maxFilesPerJob:
+        maxFilesPerJob = 1000
+
+    if not maxDuPerJob:
+        maxDuPerJob = 500000000
+
+    if not workerCount:
+        workerCount = 1
+
+    if not rsyncArgs:
+        rsyncArgs = [ "-aq" ]
+
+    if remoteDir[-1] == '/':
+        remoteDir = remoteDir[:-1]
+
+    if localDir[-1] == '/':
+        localDir = localDir[:-1]
+
+    mirror = Mirror(hostName, remoteDir, localDir, rsyncList, rsyncArgs, 
maxFilesPerJob, maxDuPerJob, workerCount, skipDeletes, verbose, dryrun)
+
+    mirror.getRsyncFileListing()
+    mirror.processRsyncFileList()
diff --git a/manifests/misc/download.pp b/manifests/misc/download.pp
index cf35f81..4b7b9e7 100644
--- a/manifests/misc/download.pp
+++ b/manifests/misc/download.pp
@@ -7,14 +7,14 @@
                group  => root,
                path   => '/usr/local/bin/rsync-dumps.sh',
                source => 'puppet:///files/misc/scripts/rsync-dumps.sh';
-        }
+       }
 
        cron { 'rsync-dumps':
                ensure  => present,
                command => '/usr/local/bin/rsync-dumps.sh',
                user    => root,
                minute  => '0',
-                hour    => '*/2',
+               hour    => '*/2',
                require => File['/usr/local/bin/rsync-dumps.sh'];
        }
 }
@@ -55,7 +55,7 @@
                require => [ Package[nfs-kernel-server], File["/etc/exports"] ],
        }
 
-        include sysctlfile::high-bandwidth-rsync
+       include sysctlfile::high-bandwidth-rsync
 
        monitor_service { "lighttpd http": description => "Lighttpd HTTP", 
check_command => "check_http" }
        monitor_service { "nfs": description => "NFS", check_command => 
"check_tcp!2049" }
@@ -117,6 +117,7 @@
        }
 }
 
+
 class misc::download-mediawiki {
 
        system_role { "misc::download-mediawiki": description => "MediaWiki 
download" }
@@ -145,6 +146,46 @@
        apache_site { "download.mediawiki.org": name => 
"download.mediawiki.org" }
 }
 
+class misc::download-gluster {
+       include role::mirror::common
+       include generic::gluster-client
+
+       system_role { "misc::download-gluster": description => "Gluster dumps 
copy" }
+
+       mount {
+               '/mnt/glusterpublicdata':
+                       ensure  => mounted,
+                       device  => 'labstore1.pmtpa.wmnet:/publicdata-project',
+                       fstype  => 'glusterfs',
+                       options => 
'defaults,_netdev=bond0,log-level=WARNING,log-file=/var/log/gluster.log',
+                       require => Package['glusterfs-client'],
+       }
+
+       file {
+               '/usr/local/bin/wmfdumpsmirror.py':
+                       ensure => present,
+                       mode   => '0755',
+                       source => 'puppet:///files/mirror/wmfdumpsmirror.py',
+                       ensure => present;
+               '/usr/local/sbin/gluster-rsync-cron.sh':
+                       mode   => '0755',
+                       source => 
'puppet:///files/mirror/gluster-rsync-cron.sh',
+       }
+
+       cron {
+              'dumps_gluster_rsync':
+                       ensure      => present,
+                       user        => root,
+                       minute      => '50',
+                       hour        => '3',
+                       command     => '/usr/local/sbin/gluster-rsync-cron.sh',
+                       environment => 'MAILTO=ops-du...@wikimedia.org',
+                       require     => [ File[ 
['/usr/local/bin/wmfdumpsmirror.py',
+                                       
'/usr/local/sbin/gluster-rsync-cron.sh'] ], Mount['/mnt/glusterpublicdata'] ]
+       }
+}
+
+
 class misc::kiwix-mirror {
        # TODO: add system_role
 
diff --git a/manifests/site.pp b/manifests/site.pp
index fc1727e..5bf2b7f 100644
--- a/manifests/site.pp
+++ b/manifests/site.pp
@@ -432,7 +432,7 @@
         accounts::catrope,
         misc::download-wikimedia,
         misc::download-primary,
-                misc::download::cron-rsync-dumps,
+        misc::download::cron-rsync-dumps,
         misc::kiwix-mirror
 }
 
@@ -446,16 +446,8 @@
         accounts::catrope,
         misc::download-wikimedia,
         misc::download-mirror,
-                misc::download::cron-rsync-dumps,
-        generic::gluster-client
-        mount { "/mnt/glusterpublicdata":
-              device => "labstore1.pmtpa.wmnet:/publicdata-project",
-              fstype => "glusterfs",
-              options => 
"defaults,_netdev=bond0,log-level=WARNING,log-file=/var/log/gluster.log",
-              require => Package["glusterfs-client"],
-              ensure => mounted;
-        }
-
+        misc::download::cron-rsync-dumps,
+        misc::download-gluster
 }
 
 # pmtpa dbs

-- 
To view, visit https://gerrit.wikimedia.org/r/78043
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7fc957a6cc1042a937b6eb89ff0dbc44847ba19b
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to