The render module, originally a simple extract of the render_job function from the old daemon, has been rewriten to expose a more sensible API and to provide self-contained timeout support.
Two classes are now available, exposing the same public API for seamless use of any of the two: * JobRenderer is a simple, blocking job renderer. It can be threaded if the caller decides to call start() instead of run(). * TimingOutJobRenderer, as its name suggests, is a timing out job renderer that makes use of the threading capability of JobRenderer to handle a timeout on the rendering process and kill the rendering thread, whatever it is doing, if the given timeout is reached. The render module also now exposes a few public RESULT_ constants that can be used to identify the result of a job rendering. This is used in the daemon to infer an appropriate resultmsg. As a standalone process, the job renderer now takes an optional second argument: the timeout, in seconds. .../scripts/wrapper.py scripts/render.py <jobid> [timeout] As of now, the daemon is fully usable in production with the same level of functionality as before. Signed-off-by: Maxime Petazzoni <maxime.petazz...@bulix.org> --- scripts/daemon.py | 72 ++++++++++++--------- scripts/render.py | 187 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 179 insertions(+), 80 deletions(-) diff --git a/scripts/daemon.py b/scripts/daemon.py index 1639034..95d3327 100755 --- a/scripts/daemon.py +++ b/scripts/daemon.py @@ -23,29 +23,38 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -import time import sys import threading +import time import render from www.maposmatic.models import MapRenderingJob from www.settings import LOG from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_MAX_SIZE_GB -RESULT_SUCCESSFULL = 'ok' -RESULT_INTERRUPTED = 'rendering interrupted' -RESULT_FAILED = 'rendering failed' -RESULT_CANCELED = 'rendering took too long, canceled' +_DEFAULT_CLEAN_FREQUENCY = 20 # Clean thread polling frequency, in + # seconds. +_DEFAULT_POLL_FREQUENCY = 10 # Daemon job polling frequency, in seconds + +_RESULT_MSGS = { + render.RESULT_SUCCESS: 'ok', + render.RESULT_KEYBOARD_INTERRUPT: 'rendering interrupted', + render.RESULT_RENDERING_EXCEPTION: 'rendering failed', + render.RESULT_TIMEOUT_REACHED: 'rendering took too long, canceled' +} class MapOSMaticDaemon: """ This is a basic rendering daemon, base class for the different implementations of rendering scheduling. By default, it acts as a standalone, single-process MapOSMatic rendering daemon. + + It of course uses the TimingOutJobRenderer to ensure no long-lasting job + stalls the queue. """ - def __init__(self, frequency): - self.frequency = 10 + def __init__(self, frequency=_DEFAULT_POLL_FREQUENCY): + self.frequency = frequency LOG.info("MapOSMatic rendering daemon started.") self.rollback_orphaned_jobs() @@ -73,31 +82,32 @@ class MapOSMaticDaemon: LOG.info("MapOSMatic rendering daemon terminating.") def dispatch(self, job): - """Dispatch the given job. In this simple single-process daemon, this - is as simple as rendering it.""" - self.render(job) + """In this simple single-process daemon, dispatching is as easy as + calling the render() method. Subclasses probably want to overload this + method too and implement a more clever dispatching mechanism. + + Args: + job (MapRenderingJob): the job to process and render. - def render(self, job): - """Render the given job, handling the different rendering outcomes - (success or failures).""" + Returns True if the rendering was successful, False otherwise. + """ - LOG.info("Rendering job #%d '%s'..." % - (job.id, job.maptitle)) - job.start_rendering() + return self.render(job, 'maposmaticd_%d_' % os.getpid()) + + def render(self, job, prefix=None): + """Render the given job using a timing out job renderer. - ret = render.render_job(job) - if ret == 0: - msg = RESULT_SUCCESSFULL - LOG.info("Finished rendering of job #%d." % job.id) - elif ret == 1: - msg = RESULT_INTERRUPTED - LOG.info("Rendering of job #%d interrupted!" % job.id) - else: - msg = RESULT_FAILED - LOG.info("Rendering of job #%d failed (exception occurred)!" % - job.id) + Args: + job (MapRenderingJob): the job to process and render. + + Returns True if the rendering was successful, False otherwise. + """ - job.end_rendering(msg) + renderer = render.TimingOutJobRenderer(job, prefix=prefix) + job.start_rendering() + ret = renderer.run() + job.end_rendering(_RESULT_MSGS[ret]) + return ret == 0 class RenderingsGarbageCollector(threading.Thread): @@ -107,7 +117,7 @@ class RenderingsGarbageCollector(threading.Thread): of RENDERING_RESULT_MAX_SIZE_GB. """ - def __init__(self, frequency=20): + def __init__(self, frequency=_DEFAULT_CLEAN_FREQUENCY): threading.Thread.__init__(self) self.frequency = frequency @@ -217,8 +227,8 @@ if __name__ == '__main__': "Please use a valid RENDERING_RESULT_PATH.") sys.exit(1) - daemon = MapOSMaticDaemon(10) - cleaner = RenderingsGarbageCollector(20) + cleaner = RenderingsGarbageCollector() + daemon = MapOSMaticDaemon() cleaner.start() daemon.serve() diff --git a/scripts/render.py b/scripts/render.py index 32ae347..ea370f6 100755 --- a/scripts/render.py +++ b/scripts/render.py @@ -25,77 +25,166 @@ import Image import os import sys +import threading from ocitysmap.coords import BoundingBox from ocitysmap.street_index import OCitySMap from www.maposmatic.models import MapRenderingJob -from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS +from www.settings import LOG from www.settings import OCITYSMAP_CFG_PATH +from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS -def render_job(job, prefix=None): - """Renders the given job, encapsulating all processing errors and - exceptions. +RESULT_SUCCESS = 0 +RESULT_KEYBOARD_INTERRUPT = 1 +RESULT_RENDERING_EXCEPTION = 2 +RESULT_TIMEOUT_REACHED = 3 - This does not affect the job entry in the database in any way. It's the - responsibility of the caller to do maintain the job status in the database. +class TimingOutJobRenderer: + """ + The TimingOutJobRenderer is a wrapper around JobRenderer implementing + timeout management. It uses JobRenderer as a thread, and tries to join it + for the given timeout. If the timeout is reached, the thread is suspended, + cleaned up and killed. - Returns: - * 0 on success; - * 1 on ^C; - * 2 on a rendering exception from OCitySMap. + The TimingOutJobRenderer has exactly the same API as the non-threading + JobRenderer, so it can be used in place of JobRenderer very easily. """ - if job.administrative_city is None: - bbox = BoundingBox(job.lat_upper_left, job.lon_upper_left, - job.lat_bottom_right, job.lon_bottom_right) - renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, - map_areas_prefix=prefix, - boundingbox=bbox, - language=job.map_language) - else: - renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, - map_areas_prefix=prefix, - osmid=job.administrative_osmid, - language=job.map_language) - - prefix = os.path.join(RENDERING_RESULT_PATH, job.files_prefix()) + def __init__(self, job, timeout=1200, prefix=None): + """Initializes this TimingOutJobRenderer with a given job and a timeout. + + Args: + job (MapRenderingJob): the job to render. + timeout (int): a timeout, in seconds (defaults to 20 minutes). + prefix (string): renderer map_areas table prefix. + """ + + self.__timeout = timeout + self.__thread = JobRenderer(job, prefix) + + def run(self): + """Renders the job using a JobRendered, encapsulating all processing + errors and exceptions, with the addition here of a processing timeout. + + Returns one of the RESULT_ constants. + """ + + self.__thread.start() + self.__thread.join(self.__timeout) + + # If the thread is no longer alive, the timeout was not reached and all + # is well. + if not self.__thread.isAlive(): + return self.__thread.result + + LOG.info("Rendering of job #%d took too long (timeout reached)!" % + self.__thread.job.id) + + # Remove the job files + self.__thread.job.remove_all_files() + + # Kill the thread and return TIMEOUT_REACHED + del self.__thread + return RESULT_TIMEOUT_REACHED + +class JobRenderer(threading.Thread): + """ + A simple, blocking job rendered. It can be used as a thread, or directly in + the main processing path of the caller if it chooses to call run() + directly. + """ + + def __init__(self, job, prefix): + threading.Thread.__init__(self) + self.job = job + self.prefix = prefix + self.result = None + + def run(self): + """Renders the given job, encapsulating all processing errors and + exceptions. + + This does not affect the job entry in the database in any way. It's the + responsibility of the caller to do maintain the job status in the + database. + + Returns one of the RESULT_ constants. + """ + + LOG.info("Rendering job #%d '%s'..." % (self.job.id, self.job.maptitle)) + + if self.job.administrative_city is None: + bbox = BoundingBox(self.job.lat_upper_left, + self.job.lon_upper_left, + self.job.lat_bottom_right, + self.job.lon_bottom_right) + renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, + map_areas_prefix=self.prefix, + boundingbox=bbox, + language=self.job.map_language) + else: + renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, + map_areas_prefix=self.prefix, + osmid=self.job.administrative_osmid, + language=self.job.map_language) + + prefix = os.path.join(RENDERING_RESULT_PATH, self.job.files_prefix()) + + try: + # Render the map in all RENDERING_RESULT_FORMATS + result = renderer.render_map_into_files(self.job.maptitle, prefix, + RENDERING_RESULT_FORMATS, + 'zoom:16') + + # Render the index in all RENDERING_RESULT_FORMATS, using the + # same map size. + renderer.render_index(self.job.maptitle, prefix, + RENDERING_RESULT_FORMATS, + result.width, result.height) + + # Create thumbnail + if 'png' in RENDERING_RESULT_FORMATS: + img = Image.open(prefix + '.png') + img.thumbnail((200, 200), Image.ANTIALIAS) + img.save(prefix + '_small.png') + + self.result = RESULT_SUCCESS + LOG.info("Finished rendering of job #%d." % self.job.id) + except KeyboardInterrupt: + self.result = RESULT_KEYBOARD_INTERRUPT + LOG.info("Rendering of job #%d interrupted!" % self.job.id) + except: + self.result = RESULT_RENDERING_EXCEPTION + LOG.info("Rendering of job #%d failed (exception occurred)!" % + self.job.id) + + # Remove the job files if the rendering was not successful. + if self.result: + self.job.remove_all_files() + + return self.result - try: - # Render the map in all RENDERING_RESULT_FORMATS - result = renderer.render_map_into_files(job.maptitle, prefix, - RENDERING_RESULT_FORMATS, - 'zoom:16') - - # Render the index in all RENDERING_RESULT_FORMATS, using the - # same map size. - renderer.render_index(job.maptitle, prefix, RENDERING_RESULT_FORMATS, - result.width, result.height) - - # Create thumbnail - if 'png' in RENDERING_RESULT_FORMATS: - img = Image.open(prefix + '.png') - img.thumbnail((200, 200), Image.ANTIALIAS) - img.save(prefix + '_small.png') - - return 0 - except KeyboardInterrupt: - return 1 - except: - return 2 if __name__ == '__main__': def usage(): - sys.stderr.write('usage: %s <jobid>' % sys.argv[0]) + sys.stderr.write('usage: %s <jobid> [timeout]\n' % sys.argv[0]) - if len(sys.argv) != 2: + if len(sys.argv) < 2 or len(sys.argv) > 3: usage() sys.exit(3) try: jobid = int(sys.argv[1]) job = MapRenderingJob.objects.get(id=jobid) + if job: - sys.exit(render_job(job, 'renderer_%d' % os.getpid())) + prefix = 'renderer_%d_' % os.getpid() + if len(sys.argv) == 3: + renderer = TimingOutJobRenderer(job, int(sys.argv[2]), prefix) + else: + renderer = JobRenderer(job, prefix) + + sys.exit(renderer.run()) else: sys.stderr.write('Job #%d not found!' % jobid) sys.exit(4) -- 1.6.3.3.277.g88938c