Mwalker has uploaded a new change for review. https://gerrit.wikimedia.org/r/149256
Change subject: Add a garbage collector to OCG ...................................................................... Add a garbage collector to OCG * Cleans up redis jobstatus instances after some period of time * Deletes files in the output, temp, and postmortem directories Change-Id: I7499c64b69bcfbdc4d7b870b9ae29c6bfa901d87 --- M defaults.js M lib/RedisWrapper.js A lib/threads/gc.js M mw-ocg-service.js A scripts/run-garbage-collect.js 5 files changed, 331 insertions(+), 3 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/Collection/OfflineContentGenerator refs/changes/56/149256/1 diff --git a/defaults.js b/defaults.js index 005695d..fafd07d 100644 --- a/defaults.js +++ b/defaults.js @@ -118,5 +118,22 @@ */ "logging": { "winston/transports/Console": { level: "info" } + }, + /** Garbage collection thread */ + "garbage_collection": { + /** Seconds between garbage collection runs */ + every: 0.25 * 24 * 60 * 60, + /** Lifetime, in seconds, of a job status object in redis */ + job_lifetime: 5 * 24 * 60 * 60, + /** Lifetime, in seconds, of any successful job artifacts on the file system */ + job_file_lifetime: 5.5 * 24 * 60 * 60, + /** Lifetime, in seconds, of a job status object that failed in redis */ + failed_job_lifetime: 24 * 60 * 60, + /** Lifetime, in seconds, of an object in the temp file system. Must be longer + * than the longest expected job runtime. (We check ctime not atime) + */ + temp_file_lifetime: 0.5 * 24 * 60 * 60, + /** Lifetime, in seconds, of an object in the post mortem directory. */ + postmortem_file_lifetime: 5 * 24 * 60 * 60 } }; diff --git a/lib/RedisWrapper.js b/lib/RedisWrapper.js index 7c83977..0504dae 100644 --- a/lib/RedisWrapper.js +++ b/lib/RedisWrapper.js @@ -131,5 +131,8 @@ RedisWrapper.prototype.hlen = function( key ) { return Promise.promisify( this.client.hlen, false, this.client )( key ); }; +RedisWrapper.prototype.hscan = function( key, iterator ) { + return Promise.promisify( this.client.hscan, false, this.client )( key, iterator ); +}; module.exports = RedisWrapper; diff --git a/lib/threads/gc.js b/lib/threads/gc.js new file mode 100644 index 0000000..5b8e686 --- /dev/null +++ b/lib/threads/gc.js @@ -0,0 +1,215 @@ +"use strict"; +/** + * This file is part of the Collection Extension to MediaWiki + * https://www.mediawiki.org/wiki/Extension:Collection + * + * The garbage collection thread will + * - Delete JobStatus objects after `gc.completed_job_lifetime` + * - Delete job files after `gc.completed_job_file_lifetime` + * - Clear the temp folder after `gc.temp_file_lifetime` + * - Clear the postmortem folder after `gc.postmortem_file_lifetime` + * + * The thread will run every `gc.every` seconds + * + * @section LICENSE + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +require( 'es6-shim' ); +require( 'prfun' ); + +var fs = require( 'fs' ); +var path = require( 'path' ); +var rimraf = require( 'rimraf' ); + +var eh = require( '../errorhelper.js' ); +var jd = require( '../JobDetails.js' ); +var Redis = require( '../RedisWrapper.js' ); + +var config = null; +var running = false; +var intervalTimer = null; +var redisClient = null; +var redisIterator = 0; + +/* === Public Exported Functions =========================================== */ +/** + * Initialize the frontend server with global objects + * + * @param config_obj Configuration object + */ +function init( config_obj ) { + config = config_obj; + redisClient = new Redis( + config.redis.host, + config.redis.port, + config.redis.password + ); +} + +/** + * Starts the garbage collection thread. + */ +function startThread() { + running = true; + + redisClient.on( 'closed', function () { + if ( running ) { + console.error( 'Garbage collector connection to redis died, killing thread.', { + channel: 'gc.error.fatal' + } ); + stopThread( process.exit ); + } + } ); + redisClient.on('opened', function() { + intervalTimer = setInterval( doGCRun, config.garbage_collection.every * 1000 ); + } ); + redisClient.connect(); +} + +function doSingleRun() { + redisClient.on( 'closed', function () { + if ( running ) { + console.error( 'Garbage collector connection to redis died, killing thread.', { + channel: 'gc.error.fatal' + } ); + stopThread( process.exit ); + } + } ); + redisClient.connect(); + return new Promise( function( resolve, reject ) { + redisClient.on('opened', function() { + doGCRun().then( resolve ).catch( reject ); + } ); + } ); +} + +/** + * Stops (closes) the frontend server + * + * @param callbackFunc Function to call when server successfully closed + */ +function stopThread( callbackFunc ) { + running = false; + redisClient.close(); + if ( callbackFunc ) { setTimeout( callbackFunc, 1); } +} + +/* ==== The meat === */ +function doGCRun() { + var startTime = Date.now(); + + return Promise.resolve() + .then(clearJobStatusObjects) + .then(clearOutputDir) + .then(clearTempDir) + .then(clearPostmortemDir) + .then( function() { + console.info( "Finished GarbageCollection run in %s seconds", + ( ( Date.now() - startTime ) / 1000 ), { channel: 'gc' } ); + statsd.timing( 'gc.runtime', ( Date.now() - startTime ) / 1000 ); + } ); +} + +/** + * Iterate through all JobStatus objects in Redis and clear those + * that are too old. + */ +function clearJobStatusObjects() { + var clearedFailedJobs = 0, + clearedNonFailedJobs = 0; + + console.info( "Starting run to clear job status objects", { channel: 'gc' } ); + var iterateRedis = function( iterationResult ) { + var multi = redisClient.multi(); + var fjl = Date.now() - config.garbage_collection.failed_job_lifetime, + ajl = Date.now() - config.garbage_collection.job_lifetime; + + redisIterator = iterationResult[0]; + for ( var i = 0; i < iterationResult[1].length; i += 2 ) { + var job = jd.fromJson( iterationResult[1][ i+1 ] ); + if ( job.status === 'failed' && ( job.timestamp < fjl ) ) { + clearedFailedJobs += 1; + multi.hdel( config.redis.status_set_name, job.collectionId ); + } else if ( job.timestamp < ajl ) { + clearedNonFailedJobs += 1; + multi.hdel( config.redis.status_set_name, job.collectionId ); + } + } + + if ( redisIterator == 0 ) { + console.info( + "Cleared %s non-failed jobs and %s failed jobs", + clearedNonFailedJobs, + clearedFailedJobs, + { channel: 'gc' } + ); + } else { + return Promise.promisify( multi.exec, false, multi )() + .then( function() { + return multi.hscan( config.redis.status_set_name, redisIterator ); + } ) + .then( iterateRedis ); + } + }; + + return redisClient + .hlen( config.redis.status_set_name ) + .then( function() { + return redisClient.hscan( config.redis.status_set_name, redisIterator ); + } ) + .then( iterateRedis ); +} + +function cleanDir( dir, lifetime ) { + var expire = Date.now() - ( lifetime * 1000 ); + var count = 0; + return Promise.promisify( fs.readdir, false, fs )( dir ).then( function( files ) { + return Promise.map( files, function( file ) { + return Promise.promisify( fs.stat, false, fs )( path.join( dir, file ) ) + .then( function ( stat ) { + if ( stat.ctime.getTime() < expire ) { + count += 1; + return Promise.promisify( rimraf )( path.join( dir, file ) ); + } + } ); + } ).then( function() { + console.info( 'Cleared %s files from %s', count, dir, { channel: 'gc' } ); + } ); + } ); +} + +function clearOutputDir() { + return cleanDir( config.backend.output_dir, config.garbage_collection.job_file_lifetime ); +} + +function clearTempDir() { + return cleanDir( config.backend.temp_dir, config.garbage_collection.temp_file_lifetime ); +} + +function clearPostmortemDir() { + return cleanDir( + config.backend.post_mortem_dir, + config.garbage_collection.postmortem_file_lifetime + ); +} + +exports.init = init; +exports.start = startThread; +exports.stop = stopThread; +exports.singleRun = doSingleRun; diff --git a/mw-ocg-service.js b/mw-ocg-service.js index 818a985..296612d 100755 --- a/mw-ocg-service.js +++ b/mw-ocg-service.js @@ -137,10 +137,9 @@ var workers = cluster.workers; Object.keys( workers ).forEach( function ( id ) { - console.info( 'Killing frontend worker %s', id ); + console.info( 'Killing worker %s', id ); workers[id].destroy(); - } - ); + } ); process.exit( 1 ); }; @@ -179,6 +178,8 @@ } ); + spawnWorker( 'gc' ); + for ( i = 0; i < config.coordinator.frontend_threads; i++ ) { spawnWorker( 'frontend' ); } diff --git a/scripts/run-garbage-collect.js b/scripts/run-garbage-collect.js new file mode 100644 index 0000000..26e9873 --- /dev/null +++ b/scripts/run-garbage-collect.js @@ -0,0 +1,92 @@ +#!/usr/bin/env node +"use strict"; + +/** + * Collection Extension garbage collection script + * + * This script will run a single run of the OCG garbage collector. + * Normally you would not need to use this script as the OCG service + * will run the thread at a pre-scheduled interval. + * + * @section LICENSE + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +require( 'es6-shim' ); +require( 'prfun' ); + +var commander = require( 'commander' ); +var logger = require( 'winston' ); +var path = require( 'path' ); + +var StatsD = require( '../lib/statsd.js' ); + +/* === Configuration Options & File ======================================== */ +var config = require( '../defaults.js' ), configPath; +commander + .version( '0.0.1' ) + .option( '-c, --config <path>', 'Path to the local configuration file' ) + .parse( process.argv ); + +try { + configPath = commander.config; + if ( configPath ) { + if ( path.resolve( configPath ) !== path.normalize( configPath ) ) { + // If the configuration path given is relative, resolve it to be relative + // to the current working directory instead of relative to the path of this + // file. + configPath = path.resolve( process.cwd, configPath ); + } + config = require( configPath )( config ); + } +} catch ( err ) { + console.error( "Could not open configuration file %s! %s", configPath, err ); + process.exit( 1 ); +} + +/* === Initial Logging ===================================================== */ +// Remove the default logger +logger.remove( logger.transports.Console ); +// Now go through the hash and add all the required transports +for ( var transport in config.logging ) { + if ( config.logging.hasOwnProperty( transport ) ) { + var parts = transport.split( '/' ); + var classObj = require( parts.shift() ); + parts.forEach( function( k ) { + classObj = classObj[k]; + } ); + logger.add( classObj, config.logging[transport] ); + } +} +logger.extend( console ); + +global.statsd = new StatsD( + config.reporting.statsd_server, + config.reporting.statsd_port, + config.reporting.prefix, + '', + config.reporting.is_txstatsd, + false, // Don't globalize, we're doing that here + true, // Do cache DNS queries + config.reporting.enable +); + +/* === Do the deed ========================================================= */ +var gc = require( '../lib/threads/gc.js' ); +gc.init( config ); +gc.singleRun().then( function() { gc.stop( process.exit ); } ).done(); -- To view, visit https://gerrit.wikimedia.org/r/149256 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7499c64b69bcfbdc4d7b870b9ae29c6bfa901d87 Gerrit-PatchSet: 1 Gerrit-Project: mediawiki/extensions/Collection/OfflineContentGenerator Gerrit-Branch: master Gerrit-Owner: Mwalker <mwal...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits