Mwalker has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/149256

Change subject: Add a garbage collector to OCG
......................................................................

Add a garbage collector to OCG

* Cleans up redis jobstatus instances after some period of time
* Deletes files in the output, temp, and postmortem directories

Change-Id: I7499c64b69bcfbdc4d7b870b9ae29c6bfa901d87
---
M defaults.js
M lib/RedisWrapper.js
A lib/threads/gc.js
M mw-ocg-service.js
A scripts/run-garbage-collect.js
5 files changed, 331 insertions(+), 3 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/Collection/OfflineContentGenerator
 refs/changes/56/149256/1

diff --git a/defaults.js b/defaults.js
index 005695d..fafd07d 100644
--- a/defaults.js
+++ b/defaults.js
@@ -118,5 +118,22 @@
         */
        "logging": {
                "winston/transports/Console": { level: "info" }
+       },
+       /** Garbage collection thread */
+       "garbage_collection": {
+               /** Seconds between garbage collection runs */
+               every: 0.25 * 24 * 60 * 60,
+               /** Lifetime, in seconds, of a job status object in redis */
+               job_lifetime: 5 * 24 * 60 * 60,
+               /** Lifetime, in seconds, of any successful job artifacts on 
the file system */
+               job_file_lifetime: 5.5 * 24 * 60 * 60,
+               /** Lifetime, in seconds, of a job status object that failed in 
redis */
+               failed_job_lifetime: 24 * 60 * 60,
+               /** Lifetime, in seconds, of an object in the temp file system. 
Must be longer
+                * than the longest expected job runtime. (We check ctime not 
atime)
+                */
+               temp_file_lifetime: 0.5 * 24 * 60 * 60,
+               /** Lifetime, in seconds, of an object in the post mortem 
directory. */
+               postmortem_file_lifetime: 5 * 24 * 60 * 60
        }
 };
diff --git a/lib/RedisWrapper.js b/lib/RedisWrapper.js
index 7c83977..0504dae 100644
--- a/lib/RedisWrapper.js
+++ b/lib/RedisWrapper.js
@@ -131,5 +131,8 @@
 RedisWrapper.prototype.hlen = function( key ) {
        return Promise.promisify( this.client.hlen, false, this.client )( key );
 };
+RedisWrapper.prototype.hscan = function( key, iterator ) {
+       return Promise.promisify( this.client.hscan, false, this.client )( key, 
iterator );
+};
 
 module.exports = RedisWrapper;
diff --git a/lib/threads/gc.js b/lib/threads/gc.js
new file mode 100644
index 0000000..5b8e686
--- /dev/null
+++ b/lib/threads/gc.js
@@ -0,0 +1,215 @@
+"use strict";
+/**
+ * This file is part of the Collection Extension to MediaWiki
+ * https://www.mediawiki.org/wiki/Extension:Collection
+ *
+ * The garbage collection thread will
+ * - Delete JobStatus objects after `gc.completed_job_lifetime`
+ * - Delete job files after `gc.completed_job_file_lifetime`
+ * - Clear the temp folder after `gc.temp_file_lifetime`
+ * - Clear the postmortem folder after `gc.postmortem_file_lifetime`
+ *
+ * The thread will run every `gc.every` seconds
+ *
+ * @section LICENSE
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+require( 'es6-shim' );
+require( 'prfun' );
+
+var fs = require( 'fs' );
+var path = require( 'path' );
+var rimraf = require( 'rimraf' );
+
+var eh = require( '../errorhelper.js' );
+var jd = require( '../JobDetails.js' );
+var Redis = require( '../RedisWrapper.js' );
+
+var config = null;
+var running = false;
+var intervalTimer = null;
+var redisClient = null;
+var redisIterator = 0;
+
+/* === Public Exported Functions =========================================== */
+/**
+ * Initialize the frontend server with global objects
+ *
+ * @param config_obj Configuration object
+ */
+function init( config_obj ) {
+       config = config_obj;
+       redisClient = new Redis(
+               config.redis.host,
+               config.redis.port,
+               config.redis.password
+       );
+}
+
+/**
+ * Starts the garbage collection thread.
+ */
+function startThread() {
+       running = true;
+
+       redisClient.on( 'closed', function () {
+               if ( running ) {
+                       console.error( 'Garbage collector connection to redis 
died, killing thread.', {
+                               channel: 'gc.error.fatal'
+                       } );
+                       stopThread( process.exit );
+               }
+       } );
+       redisClient.on('opened', function() {
+               intervalTimer = setInterval( doGCRun, 
config.garbage_collection.every * 1000 );
+       } );
+       redisClient.connect();
+}
+
+function doSingleRun() {
+       redisClient.on( 'closed', function () {
+               if ( running ) {
+                       console.error( 'Garbage collector connection to redis 
died, killing thread.', {
+                               channel: 'gc.error.fatal'
+                       } );
+                       stopThread( process.exit );
+               }
+       } );
+       redisClient.connect();
+       return new Promise( function( resolve, reject ) {
+               redisClient.on('opened', function() {
+                       doGCRun().then( resolve ).catch( reject );
+               } );
+       } );
+}
+
+/**
+ * Stops (closes) the frontend server
+ *
+ * @param callbackFunc Function to call when server successfully closed
+ */
+function stopThread( callbackFunc ) {
+       running = false;
+       redisClient.close();
+       if ( callbackFunc ) { setTimeout( callbackFunc, 1); }
+}
+
+/* ==== The meat === */
+function doGCRun() {
+       var startTime = Date.now();
+
+       return Promise.resolve()
+               .then(clearJobStatusObjects)
+               .then(clearOutputDir)
+               .then(clearTempDir)
+               .then(clearPostmortemDir)
+               .then( function() {
+                       console.info( "Finished GarbageCollection run in %s 
seconds",
+                               ( ( Date.now() - startTime ) / 1000 ), { 
channel: 'gc' } );
+                       statsd.timing( 'gc.runtime', ( Date.now() - startTime ) 
/ 1000 );
+               } );
+}
+
+/**
+ * Iterate through all JobStatus objects in Redis and clear those
+ * that are too old.
+ */
+function clearJobStatusObjects() {
+       var clearedFailedJobs = 0,
+               clearedNonFailedJobs = 0;
+
+       console.info( "Starting run to clear job status objects", { channel: 
'gc' } );
+       var iterateRedis = function( iterationResult ) {
+               var multi = redisClient.multi();
+               var fjl = Date.now() - 
config.garbage_collection.failed_job_lifetime,
+                       ajl = Date.now() - 
config.garbage_collection.job_lifetime;
+
+               redisIterator = iterationResult[0];
+               for ( var i = 0; i < iterationResult[1].length; i += 2 ) {
+                       var job = jd.fromJson( iterationResult[1][ i+1 ] );
+                       if ( job.status === 'failed' && ( job.timestamp < fjl ) 
) {
+                               clearedFailedJobs += 1;
+                               multi.hdel( config.redis.status_set_name, 
job.collectionId );
+                       } else if ( job.timestamp < ajl ) {
+                               clearedNonFailedJobs += 1;
+                               multi.hdel( config.redis.status_set_name, 
job.collectionId );
+                       }
+               }
+
+               if ( redisIterator == 0 ) {
+                       console.info(
+                               "Cleared %s non-failed jobs and %s failed jobs",
+                               clearedNonFailedJobs,
+                               clearedFailedJobs,
+                               { channel: 'gc' }
+                       );
+               } else {
+                       return Promise.promisify( multi.exec, false, multi )()
+                               .then( function() {
+                                       return multi.hscan( 
config.redis.status_set_name, redisIterator );
+                               } )
+                               .then( iterateRedis );
+               }
+       };
+
+       return redisClient
+               .hlen( config.redis.status_set_name )
+               .then( function() {
+                       return redisClient.hscan( config.redis.status_set_name, 
redisIterator );
+               } )
+               .then( iterateRedis );
+}
+
+function cleanDir( dir, lifetime ) {
+       var expire = Date.now() - ( lifetime * 1000 );
+       var count = 0;
+       return Promise.promisify( fs.readdir, false, fs )( dir ).then( 
function( files ) {
+               return Promise.map( files, function( file ) {
+                       return Promise.promisify( fs.stat, false, fs )( 
path.join( dir, file ) )
+                               .then( function ( stat ) {
+                                       if ( stat.ctime.getTime() < expire ) {
+                                               count += 1;
+                                               return Promise.promisify( 
rimraf )( path.join( dir, file ) );
+                                       }
+                               } );
+               } ).then( function() {
+                       console.info( 'Cleared %s files from %s', count, dir, { 
channel: 'gc' } );
+               } );
+       } );
+}
+
+function clearOutputDir() {
+       return cleanDir( config.backend.output_dir, 
config.garbage_collection.job_file_lifetime );
+}
+
+function clearTempDir() {
+       return cleanDir( config.backend.temp_dir, 
config.garbage_collection.temp_file_lifetime );
+}
+
+function clearPostmortemDir() {
+       return cleanDir(
+               config.backend.post_mortem_dir,
+               config.garbage_collection.postmortem_file_lifetime
+       );
+}
+
+exports.init = init;
+exports.start = startThread;
+exports.stop = stopThread;
+exports.singleRun = doSingleRun;
diff --git a/mw-ocg-service.js b/mw-ocg-service.js
index 818a985..296612d 100755
--- a/mw-ocg-service.js
+++ b/mw-ocg-service.js
@@ -137,10 +137,9 @@
 
                var workers = cluster.workers;
                Object.keys( workers ).forEach( function ( id ) {
-                       console.info( 'Killing frontend worker %s', id );
+                       console.info( 'Killing worker %s', id );
                        workers[id].destroy();
-               }
-               );
+               } );
                process.exit( 1 );
        };
 
@@ -179,6 +178,8 @@
        }
        );
 
+       spawnWorker( 'gc' );
+
        for ( i = 0; i < config.coordinator.frontend_threads; i++ ) {
                spawnWorker( 'frontend' );
        }
diff --git a/scripts/run-garbage-collect.js b/scripts/run-garbage-collect.js
new file mode 100644
index 0000000..26e9873
--- /dev/null
+++ b/scripts/run-garbage-collect.js
@@ -0,0 +1,92 @@
+#!/usr/bin/env node
+"use strict";
+
+/**
+ * Collection Extension garbage collection script
+ *
+ * This script will run a single run of the OCG garbage collector.
+ * Normally you would not need to use this script as the OCG service
+ * will run the thread at a pre-scheduled interval.
+ *
+ * @section LICENSE
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+require( 'es6-shim' );
+require( 'prfun' );
+
+var commander = require( 'commander' );
+var logger = require( 'winston' );
+var path = require( 'path' );
+
+var StatsD = require( '../lib/statsd.js' );
+
+/* === Configuration Options & File ======================================== */
+var config = require( '../defaults.js' ), configPath;
+commander
+       .version( '0.0.1' )
+       .option( '-c, --config <path>', 'Path to the local configuration file' )
+       .parse( process.argv );
+
+try {
+       configPath = commander.config;
+       if ( configPath ) {
+               if ( path.resolve( configPath ) !== path.normalize( configPath 
) ) {
+                       // If the configuration path given is relative, resolve 
it to be relative
+                       // to the current working directory instead of relative 
to the path of this
+                       // file.
+                       configPath = path.resolve( process.cwd, configPath );
+               }
+               config = require( configPath )( config );
+       }
+} catch ( err ) {
+       console.error( "Could not open configuration file %s! %s", configPath, 
err );
+       process.exit( 1 );
+}
+
+/* === Initial Logging ===================================================== */
+// Remove the default logger
+logger.remove( logger.transports.Console );
+// Now go through the hash and add all the required transports
+for ( var transport in config.logging ) {
+       if ( config.logging.hasOwnProperty( transport ) ) {
+               var parts = transport.split( '/' );
+               var classObj = require( parts.shift() );
+               parts.forEach( function( k ) {
+                       classObj = classObj[k];
+               } );
+               logger.add( classObj, config.logging[transport] );
+       }
+}
+logger.extend( console );
+
+global.statsd = new StatsD(
+       config.reporting.statsd_server,
+       config.reporting.statsd_port,
+       config.reporting.prefix,
+       '',
+       config.reporting.is_txstatsd,
+       false, // Don't globalize, we're doing that here
+       true,  // Do cache DNS queries
+       config.reporting.enable
+);
+
+/* === Do the deed ========================================================= */
+var gc = require( '../lib/threads/gc.js' );
+gc.init( config );
+gc.singleRun().then( function() { gc.stop( process.exit ); } ).done();

-- 
To view, visit https://gerrit.wikimedia.org/r/149256
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I7499c64b69bcfbdc4d7b870b9ae29c6bfa901d87
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/Collection/OfflineContentGenerator
Gerrit-Branch: master
Gerrit-Owner: Mwalker <mwal...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to