Antony Lesuisse (OpenERP) has proposed merging lp:~openerp-dev/openobject-server/6.1-multicorn-al into lp:openobject-server/6.1.
Requested reviews: OpenERP Core Team (openerp) For more details, see: https://code.launchpad.net/~openerp-dev/openobject-server/6.1-multicorn-al/+merge/109544 -- https://code.launchpad.net/~openerp-dev/openobject-server/6.1-multicorn-al/+merge/109544 Your team OpenERP R&D Team is subscribed to branch lp:~openerp-dev/openobject-server/6.1-multicorn-al.
=== removed file 'gunicorn.conf.py' --- gunicorn.conf.py 2012-03-23 13:19:32 +0000 +++ gunicorn.conf.py 1970-01-01 00:00:00 +0000 @@ -1,62 +0,0 @@ -# Gunicorn sample configuration file. -# See http://gunicorn.org/configure.html for more details. -# -# To run the OpenERP server via Gunicorn, change the appropriate -# settings below, in order to provide the parameters that -# would normally be passed in the command-line, -# (at least `bind` and `conf['addons_path']`), then execute: -# $ gunicorn openerp:wsgi.core.application -c gunicorn.conf.py -# or if you want to run it behind a reverse proxy, add the line -# import openerp.wsgi.proxied -# in this file and execute: -# $ gunicorn openerp:wsgi.proxied.application -c gunicorn.conf.py - -import openerp - -# Standard OpenERP XML-RPC port is 8069 -bind = '127.0.0.1:8069' - -pidfile = '.gunicorn.pid' - -# Gunicorn recommends 2-4 x number_of_cpu_cores, but -# you'll want to vary this a bit to find the best for your -# particular work load. -workers = 4 - -# Some application-wide initialization is needed. -on_starting = openerp.wsgi.core.on_starting -pre_request = openerp.wsgi.core.pre_request -post_request = openerp.wsgi.core.post_request - -# openerp request-response cycle can be quite long for -# big reports for example -timeout = 240 - -max_requests = 2000 - -# Equivalent of --load command-line option -openerp.conf.server_wide_modules = ['web'] - -# internal TODO: use openerp.conf.xxx when available -conf = openerp.tools.config - -# Path to the OpenERP Addons repository (comma-separated for -# multiple locations) -conf['addons_path'] = '/home/openerp/addons/trunk,/home/openerp/web/trunk/addons' - -# Optional database config if not using local socket -#conf['db_name'] = 'mycompany' -#conf['db_host'] = 'localhost' -#conf['db_user'] = 'foo' -#conf['db_port'] = 5432 -#conf['db_password'] = 'secret' - -# OpenERP Log Level -# DEBUG=10, DEBUG_RPC=8, DEBUG_RPC_ANSWER=6, DEBUG_SQL=5, INFO=20, -# WARNING=30, ERROR=40, CRITICAL=50 -# conf['log_level'] = 20 - -# If --static-http-enable is used, path for the static web directory -#conf['static_http_document_root'] = '/var/www' - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: === removed file 'openerp-cron-worker' --- openerp-cron-worker 2012-05-30 08:12:58 +0000 +++ openerp-cron-worker 1970-01-01 00:00:00 +0000 @@ -1,111 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -OpenERP cron jobs worker - -This script executes OpenERP cron jobs. Normally, cron jobs are handled by the -OpenERP server but depending on deployment needs, independent worker processes -can be used. This is especially the case when the server is run via Gunicorn. - -OpenERP cron jobs worker re-uses openerp-server command-line options but does -not honor all of them. - -Meaningful options include: - - -d, --database comma-separated list of databases to monitor for cron jobs - processing. If left empty, the worker monitors all databases - (given by `psql -ls`). - - --addons-path as ususal. - - --cpu-time-limit - --virtual-memory-limit - --virtual-memory-reset Those three options have the same meaning the for - the server with Gunicorn. The only catch is: To - not enable rlimits by default, those options are - honored only when --cpu-time-limte is different than - 60 (its default value). -""" - -import logging -import os -import signal -import sys - -import openerp - -# Also use the `openerp` logger for the main script. -_logger = logging.getLogger('openerp') - -# Variable keeping track of the number of calls to the signal handler defined -# below. This variable is monitored by ``quit_on_signals()``. -quit_signals_received = 0 - -# TODO copy/pasted from openerp-server -def signal_handler(sig, frame): - """ Signal handler: exit ungracefully on the second handled signal. - - :param sig: the signal number - :param frame: the interrupted stack frame or None - """ - global quit_signals_received - quit_signals_received += 1 - import openerp.addons.base - openerp.addons.base.ir.ir_cron.quit_signal_received = True - if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress: - _logger.info("Waiting for the current job to complete.") - print "Waiting for the current job to complete." - print "Hit Ctrl-C again to force shutdown." - if quit_signals_received > 1: - # logging.shutdown was already called at this point. - sys.stderr.write("Forced shutdown.\n") - os._exit(0) - -# TODO copy/pasted from openerp-server -def setup_signal_handlers(): - """ Register the signal handler defined above. """ - SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split()) - if os.name == 'posix': - map(lambda sig: signal.signal(sig, signal_handler), SIGNALS) - elif os.name == 'nt': - import win32api - win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1) - -def list_databases(): - import subprocess - p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE) - p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE) - p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. - output = p2.communicate()[0] - databases = output.splitlines() - # TODO filter out non-OpenERP databases - databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']] - databases = [d for d in databases if not d.startswith('postgres')] - return databases - -if __name__ == '__main__': - os.environ['TZ'] = 'UTC' - openerp.tools.config.parse_config(sys.argv[1:]) - config = openerp.tools.config - if config['log_handler'] == [':INFO']: - # Replace the default value, which is suitable for openerp-server. - config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG') - setup_signal_handlers() - openerp.modules.module.initialize_sys_path() - openerp.modules.loading.open_openerp_namespace() - openerp.netsvc.init_logger() - openerp.cron.enable_schedule_wakeup = False - openerp.multi_process = True # enable multi-process signaling - import openerp.addons.base - print "OpenERP cron jobs worker. Hit Ctrl-C to exit." - print "Documentation is available at the top of the `opener-cron-worker` file." - if config['db_name']: - db_names = config['db_name'].split(',') - print "Monitoring %s databases." % len(db_names) - else: - db_names = list_databases - print "Monitored databases are auto-discovered." - openerp.addons.base.ir.ir_cron.ir_cron._run(db_names) - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: === modified file 'openerp-server' --- openerp-server 2012-03-29 13:20:28 +0000 +++ openerp-server 2012-06-12 21:44:18 +0000 @@ -111,7 +111,6 @@ except Exception: _logger.exception('Failed to initialize database `%s` and run test file `%s`.', dbname, test_file) - def export_translation(): config = openerp.tools.config dbname = config['db_name'] @@ -201,6 +200,7 @@ except KeyboardInterrupt, e: pass + config = openerp.tools.config if config['pidfile']: os.unlink(config['pidfile']) @@ -213,8 +213,7 @@ import babel babel.localedata._dirname = os.path.join(os.path.dirname(sys.executable), 'localedata') -if __name__ == "__main__": - +def main(): os.environ["TZ"] = "UTC" check_root_user() @@ -243,20 +242,13 @@ sys.exit(0) if not config["stop_after_init"]: + setup_pid_file() # Some module register themselves when they are loaded so we need the # services to be running before loading any registry. - openerp.service.start_services() - - for m in openerp.conf.server_wide_modules: - try: - openerp.modules.module.load_openerp_module(m) - except Exception: - msg = '' - if m == 'web': - msg = """ -The `web` module is provided by the addons found in the `openerp-web` project. -Maybe you forgot to add those addons in your addons_path configuration.""" - _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) + if config['multiprocess']: + openerp.service.start_services_multi() + else: + openerp.service.start_services() if config['db_name']: for dbname in config['db_name'].split(','): @@ -265,8 +257,10 @@ if config["stop_after_init"]: sys.exit(0) - setup_pid_file() _logger.info('OpenERP server is running, waiting for connections...') quit_on_signals() +if __name__ == "__main__": + main() + # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: === modified file 'openerp/addons/base/ir/ir_cron.py' --- openerp/addons/base/ir/ir_cron.py 2012-04-02 12:01:58 +0000 +++ openerp/addons/base/ir/ir_cron.py 2012-06-12 21:44:18 +0000 @@ -39,14 +39,6 @@ _logger = logging.getLogger(__name__) -# This variable can be set by a signal handler to stop the infinite loop in -# ir_cron._run() -quit_signal_received = False - -# This variable can be checked to know if ir_cron._run() is processing a job or -# sleeping. -job_in_progress = True - def str2tuple(s): return eval('tuple(%s)' % (s or '')) @@ -373,54 +365,6 @@ return False - @classmethod - def _run(cls, db_names): - """ - Class method intended to be run in a dedicated process to handle jobs. - This polls the database for jobs that can be run every 60 seconds. - - :param db_names: list of database names to poll or callable to - generate such a list. - """ - global quit_signal_received - while not quit_signal_received: - if callable(db_names): - names = db_names() - else: - names = db_names - for x in xrange(5): - if quit_signal_received: - return - t1 = time.time() - for db_name in names: - while True: - # Small hack to re-use the openerp-server config: - # If the cpu_time_limit has not its default value, we - # truly want to establish limits. - if openerp.tools.config['cpu_time_limit'] != 60: - openerp.wsgi.core.pre_request('dummy', 'dummy') - acquired = cls._acquire_job(db_name) - if openerp.tools.config['cpu_time_limit'] != 60: - class W(object): - alive = True - worker = W() - openerp.wsgi.core.post_request(worker, 'dummy', 'dummy') - if not worker.alive: - return - if not acquired: - break - if quit_signal_received: - return - t2 = time.time() - t = t2 - t1 - global job_in_progress - if t > 60: - _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t)) - else: - job_in_progress = False - time.sleep(60 - t) - job_in_progress = True - def update_running_cron(self, cr): """ Schedule as soon as possible a wake-up for this database. """ # Verify whether the server is already started and thus whether we need to commit === modified file 'openerp/service/__init__.py' --- openerp/service/__init__.py 2012-03-29 11:55:31 +0000 +++ openerp/service/__init__.py 2012-06-12 21:44:18 +0000 @@ -48,20 +48,30 @@ _logger = logging.getLogger(__name__) -def start_services(): - """ Start all services. - - Services include the different servers and cron threads. - - """ + +def start_internal(): # Instantiate local services (this is a legacy design). openerp.osv.osv.start_object_proxy() # Export (for RPC) services. web_services.start_web_services() +def start_server_wide_modules(): + for m in openerp.conf.server_wide_modules: + try: + openerp.modules.module.load_openerp_module(m) + except Exception: + msg = '' + if m == 'web': + msg = """ +The `web` module is provided by the addons found in the `openerp-web` project. +Maybe you forgot to add those addons in your addons_path configuration.""" + _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) + +def start_services(): + """ Start all services including http, netrpc and cron """ + start_internal() + # Initialize the HTTP stack. - #http_server.init_servers() - #http_server.init_static_http() netrpc_server.init_servers() # Start the main cron thread. @@ -73,6 +83,18 @@ # Start the WSGI server. openerp.wsgi.core.start_server() + start_server_wide_modules() + +def start_services_multi(): + openerp.multi_process = True # Yay! + start_internal() + + openerp.modules.module.initialize_sys_path() + openerp.modules.loading.open_openerp_namespace() + + start_server_wide_modules() + + openerp.wsgi.core.Multicorn(openerp.wsgi.core.application).run() def stop_services(): """ Stop all services. """ === modified file 'openerp/tools/config.py' --- openerp/tools/config.py 2012-05-30 06:42:37 +0000 +++ openerp/tools/config.py 2012-06-12 21:44:18 +0000 @@ -258,8 +258,6 @@ # Advanced options group = optparse.OptionGroup(parser, "Advanced options") - group.add_option("--cache-timeout", dest="cache_timeout", my_default=100000, - help="set the timeout for the cache system", type="int") group.add_option('--debug', dest='debug_mode', action='store_true', my_default=False, help='enable debug mode') group.add_option("--stop-after-init", action="store_true", dest="stop_after_init", my_default=False, help="stop the server after its initialization") @@ -277,23 +275,30 @@ group.add_option("--max-cron-threads", dest="max_cron_threads", my_default=4, help="Maximum number of threads processing concurrently cron jobs.", type="int") - # TODO sensible default for the three following limits. - group.add_option("--virtual-memory-limit", dest="virtual_memory_limit", my_default=768 * 1024 * 1024, - help="Maximum allowed virtual memory per Gunicorn process. " - "When the limit is reached, any memory allocation will fail.", - type="int") - group.add_option("--virtual-memory-reset", dest="virtual_memory_reset", my_default=640 * 1024 * 1024, - help="Maximum allowed virtual memory per Gunicorn process. " - "When the limit is reached, the worker will be reset after " - "the current request.", - type="int") - group.add_option("--cpu-time-limit", dest="cpu_time_limit", my_default=60, - help="Maximum allowed CPU time per Gunicorn process. " - "When the limit is reached, an exception is raised.", - type="int") group.add_option("--unaccent", dest="unaccent", my_default=False, action="store_true", help="Use the unaccent function provided by the database when available.") + parser.add_option_group(group) + group = optparse.OptionGroup(parser, "Multiprocessing options") + # TODO sensible default for the three following limits. + group.add_option("--multiprocess", dest="multiprocess", my_default=0, + help="Specify the number of workers, 0 disable multiprocessing.", + type="int") + group.add_option("--limit-memory-soft", dest="limit_memory_soft", my_default=640 * 1024 * 1024, + help="Maximum allowed virtual memory per worker, when reached the worker be reset after the current request.", + type="int") + group.add_option("--limit-memory-hard", dest="limit_memory_hard", my_default=768 * 1024 * 1024, + help="Maximum allowed virtual memory per worker, when reached, any memory allocation will fail.", + type="int") + group.add_option("--limit-time-cpu", dest="limit_time_cpu", my_default=60, + help="Maximum allowed CPU time per request.", + type="int") + group.add_option("--limit-time-real", dest="limit_time_real", my_default=60, + help="Maximum allowed Real time per request. ", + type="int") + group.add_option("--limit-request", dest="limit_request", my_default=8192, + help="Maximum number of request to be processed per worker.", + type="int") parser.add_option_group(group) # Copy all optparse options (i.e. MyOption) into self.options. @@ -375,7 +380,7 @@ # if defined dont take the configfile value even if the defined value is None keys = ['xmlrpc_interface', 'xmlrpc_port', 'db_name', 'db_user', 'db_password', 'db_host', - 'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port', 'cache_timeout', + 'db_port', 'db_template', 'logfile', 'pidfile', 'smtp_port', 'email_from', 'smtp_server', 'smtp_user', 'smtp_password', 'netrpc_interface', 'netrpc_port', 'db_maxconn', 'import_partial', 'addons_path', 'netrpc', 'xmlrpc', 'syslog', 'without_demo', 'timezone', @@ -404,8 +409,8 @@ 'stop_after_init', 'logrotate', 'without_demo', 'netrpc', 'xmlrpc', 'syslog', 'list_db', 'xmlrpcs', 'proxy_mode', 'test_file', 'test_disable', 'test_commit', 'test_report_directory', - 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', - 'virtual_memory_limit', 'virtual_memory_reset', 'cpu_time_limit', 'unaccent', + 'osv_memory_count_limit', 'osv_memory_age_limit', 'max_cron_threads', 'unaccent', + 'multiprocess', 'limit_memory_hard', 'limit_memory_soft', 'limit_time_cpu', 'limit_time_real', 'limit_request' ] for arg in keys: === modified file 'openerp/wsgi/core.py' --- openerp/wsgi/core.py 2012-05-30 08:12:58 +0000 +++ openerp/wsgi/core.py 2012-06-12 21:44:18 +0000 @@ -457,61 +457,353 @@ if httpd: httpd.shutdown() -# Master process id, can be used for signaling. -arbiter_pid = None - -# Application setup before we can spawn any worker process. -# This is suitable for e.g. gunicorn's on_starting hook. -def on_starting(server): - global arbiter_pid - arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable - openerp.multi_process = True # Yay! - openerp.netsvc.init_logger() - openerp.osv.osv.start_object_proxy() - openerp.service.web_services.start_web_services() - openerp.modules.module.initialize_sys_path() - openerp.modules.loading.open_openerp_namespace() - for m in openerp.conf.server_wide_modules: - try: - openerp.modules.module.load_openerp_module(m) - except Exception: - msg = '' - if m == 'web': - msg = """ -The `web` module is provided by the addons found in the `openerp-web` project. -Maybe you forgot to add those addons in your addons_path configuration.""" - _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) - -# Install limits on virtual memory and CPU time consumption. -def pre_request(worker, req): - import os - import psutil - import resource - import signal - # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space - rss, vms = psutil.Process(os.getpid()).get_memory_info() - soft, hard = resource.getrlimit(resource.RLIMIT_AS) - resource.setrlimit(resource.RLIMIT_AS, (config['virtual_memory_limit'], hard)) - - r = resource.getrusage(resource.RUSAGE_SELF) - cpu_time = r.ru_utime + r.ru_stime - signal.signal(signal.SIGXCPU, time_expired) - soft, hard = resource.getrlimit(resource.RLIMIT_CPU) - resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['cpu_time_limit'], hard)) - -# Reset the worker if it consumes too much memory (e.g. caused by a memory leak). -def post_request(worker, req): - import os - import psutil - rss, vms = psutil.Process(os.getpid()).get_memory_info() - if vms > config['virtual_memory_reset']: - _logger.info('Virtual memory consumption ' - 'too high, killing the worker.') - worker.alive = False # Commit suicide after the request. - -# SIGXCPU (exceeded CPU time) signal handler will raise an exception. -def time_expired(n, stack): - _logger.info('CPU time limit exceeded.') - raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception +#----------------------------------------------------------- +# Multicorn, multiprocessing inspired by gunicorn +# Files to be moved: +# wsgi/core.py -> service/wsgi.py +# this part -> service/multi.py +#----------------------------------------------------------- +import fcntl, psutil, random, resource, select, socket, time +import werkzeug.serving + +class Multicorn(object): + """ Multiprocessing inspired by (g)unicorn. + Multicorn currently uses accept(2) as dispatching method between workers + but we plan to replace it by a more intelligent dispatcher to will parse + the first HTTP request line. + """ + def __init__(self, app): + # config + self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port']) + self.population = config['multiprocess'] + self.timeout = config['limit_time_real'] + self.limit_request = config['limit_request'] + # working vars + self.beat = 4 + self.app = app + self.pid = os.getpid() + self.socket_listen = None + self.workers_http = {} + self.workers_cron = {} + self.workers = {} + self.generation = 0 + self.queue = [] + + def pipe_new(self): + pipe = os.pipe() + for fd in pipe: + # non_blocking + flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + # close_on_exec + flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + return pipe + + def pipe_ping(self, pipe): + try: + os.write(pipe[1], '.') + except IOError, e: + if e.errno not in [errno.EAGAIN, errno.EINTR]: + raise + + def signal_handler(self, sig, frame): + if len(self.queue) < 5 or sig == signal.SIGCHLD: + self.queue.append(sig) + self.pipe_ping(self.pipe) + else: + _logger.warn("Dropping signal: %s", sig) + + def worker_spawn(self, klass, workers_registry): + self.generation += 1 + worker = klass(self) + pid = os.fork() + if pid != 0: + worker.pid = pid + self.workers[pid] = worker + workers_registry[pid] = worker + return worker + else: + worker.run() + sys.exit(0) + + def worker_pop(self, pid): + if pid in self.workers: + _logger.debug("Worker (%s) unregistered",pid) + try: + self.workers_http.pop(pid,None) + self.workers_cron.pop(pid,None) + u = self.workers.pop(pid) + u.close() + except OSError: + return + + def worker_kill(self, pid, sig): + try: + os.kill(pid, sig) + except OSError, e: + if e.errno == errno.ESRCH: + self.worker_pop(pid) + + def process_signals(self): + while len(self.queue): + sig = self.queue.pop(0) + if sig in [signal.SIGINT,signal.SIGTERM]: + raise KeyboardInterrupt + + def process_zombie(self): + # reap dead workers + while 1: + try: + wpid, status = os.waitpid(-1, os.WNOHANG) + if not wpid: + break + if (status >> 8) == 3: + msg = "Critial worker error (%s)" + _logger.critical(msg, wpid) + raise Exception(msg % wpid) + self.worker_pop(wpid) + except OSError, e: + if e.errno == errno.ECHILD: + break + raise + + def process_timeout(self): + now = time.time() + for (pid, worker) in self.workers.items(): + if now - worker.watchdog_time >= worker.watchdog_timeout: + _logger.error("Worker (%s) timeout", pid) + self.worker_kill(pid, signal.SIGKILL) + + def process_spawn(self): + while len(self.workers_http) < self.population: + self.worker_spawn(UnicornHTTP, self.workers_http) + while len(self.workers_cron) < 1: # config option ? + self.worker_spawn(Unicron, self.workers_cron) + + def sleep(self): + try: + # map of fd -> worker + fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()]) + fd_in = fds.keys() + [self.pipe[0]] + # check for ping or internal wakeups + ready = select.select(fd_in, [], [], self.beat) + # update worker watchdogs + for fd in ready[0]: + if fd in fds: + fds[fd].watchdog_time = time.time() + try: + # empty pipe + while os.read(fd, 1): + pass + except OSError, e: + if e.errno not in [errno.EAGAIN]: + raise + except select.error, e: + if e[0] not in [errno.EINTR]: + raise + + def start(self): + # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted + # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the + # signal handler to overcome this behaviour + self.pipe = self.pipe_new() + # set signal + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGCHLD, self.signal_handler) + # listen to socket + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.setblocking(0) + self.socket.bind(self.address) + self.socket.listen(8) + + def run(self): + self.start() + _logger.debug("Multiprocess starting") + while 1: + try: + #_logger.debug("Multiprocess beat (%s)",time.time()) + self.process_signals() + self.process_zombie() + self.process_timeout() + self.process_spawn() + self.sleep() + except KeyboardInterrupt: + _logger.debug("Multiprocess clean stop") + self.stop() + break + except Exception,e: + _logger.exception(e) + self.stop(False) + sys.exit(-1) + + def stop(self, graceful=True): + if graceful: + _logger.info("Stopping gracefully") + limit = time.time() + self.timeout + for pid in self.workers.keys(): + self.worker_kill(pid, signal.SIGTERM) + while self.workers and time.time() < limit: + self.process_zombie() + time.sleep(0.1) + else: + _logger.info("Stopping forcefully") + for pid in self.workers.keys(): + self.worker_kill(pid, signal.SIGTERM) + self.socket_listen.close() + +class Unicorn(object): + """ Workers """ + def __init__(self, multi): + self.multi = multi + self.watchdog_time = time.time() + self.watchdog_pipe = multi.pipe_new() + self.watchdog_timeout = multi.timeout + self.ppid = os.getpid() + self.pid = None + self.alive = True + # should we rename into lifetime ? + self.request_max = multi.limit_request + self.request_count = 0 + + def close(self): + os.close(self.watchdog_pipe[0]) + os.close(self.watchdog_pipe[1]) + + def signal_handler(self, sig, frame): + self.alive = False + + def sleep(self): + try: + ret = select.select([self.multi.socket], [], [], self.multi.beat) + except select.error, e: + if e[0] not in [errno.EINTR]: + raise + + def process_limit(self): + # If our parent changed sucide + if self.ppid != os.getppid(): + _logger.info("Worker (%s) Parent changed", self.pid) + self.alive = False + # check for lifetime + if self.request_count >= self.request_max: + _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) + self.alive = False + # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). + rss, vms = psutil.Process(os.getpid()).get_memory_info() + if vms > config['limit_memory_soft']: + _logger.info('Virtual memory consumption too high, rebooting the worker.') + self.alive = False # Commit suicide after the request. + + # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard)) + + # SIGXCPU (exceeded CPU time) signal handler will raise an exception. + r = resource.getrusage(resource.RUSAGE_SELF) + cpu_time = r.ru_utime + r.ru_stime + def time_expired(n, stack): + _logger.info('CPU time limit exceeded.') + raise Exception('CPU time limit exceeded.') + signal.signal(signal.SIGXCPU, time_expired) + soft, hard = resource.getrlimit(resource.RLIMIT_CPU) + resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard)) + + def process_work(self): + pass + + def start(self): + self.pid = os.getpid() + _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid) + # Reseed the random number generator + random.seed() + # Prevent fd inherientence close_on_exec + flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags) + # reset blocking status + self.multi.socket.setblocking(0) + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGCHLD, signal.SIG_DFL) + + def stop(self): + pass + + def run(self): + try: + self.start() + while self.alive: + self.process_limit() + self.multi.pipe_ping(self.watchdog_pipe) + self.sleep() + self.process_work() + _logger.info("Worker (%s) exiting...",self.pid) + self.stop() + except Exception,e: + _logger.exception("Worker (%s) Exception occured, exiting..."%self.pid) + # should we use 3 to abort everything ? + sys.exit(1) + +class UnicornHTTP(Unicorn): + """ HTTP Request workers """ + def process_request(self, client, addr): + client.setblocking(1) + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # Prevent fd inherientence close_on_exec + flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC + fcntl.fcntl(client, fcntl.F_SETFD, flags) + # do request using UnicornBaseWSGIServer monkey patched with socket + self.server.socket = client + self.server.process_request(client,addr) + self.request_count += 1 + + def process_work(self): + try: + client, addr = self.multi.socket.accept() + self.process_request(client, addr) + except socket.error, e: + if e[0] not in (errno.EAGAIN, errno.ECONNABORTED): + raise + + def start(self): + Unicorn.start(self) + self.server = UnicornBaseWSGIServer(self.multi.app) + +class UnicornBaseWSGIServer(werkzeug.serving.BaseWSGIServer): + """ werkzeug WSGI Server patched to allow using an external listen socket + """ + def __init__(self, app): + werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app) + def server_bind(self): + # we dont bind beause we use the listen socket of Multicorn#socket + # instead we close the socket + if self.socket: + self.socket.close() + def server_activate(self): + # dont listen as we use Multicorn#socket + pass + +class Unicron(Unicorn): + """ Cron workers """ + def sleep(self): + time.sleep(60) + + def process_work(self): + if config['db_name']: + db_names = config['db_name'].split(',') + else: + db_names = openerp.netsvc.ExportService._services['db'].exp_list(True) + for db_name in db_names: + while True: + # TODO Each job should be considered as one request in multiprocessing + acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name) + if not acquired: + break + self.request_count += 1 + + def start(self): + Unicorn.start(self) # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: === removed file 'openerp/wsgi/proxied.py' --- openerp/wsgi/proxied.py 2012-02-10 15:06:15 +0000 +++ openerp/wsgi/proxied.py 1970-01-01 00:00:00 +0000 @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# -# OpenERP, Open Source Management Solution -# Copyright (C) 2012 OpenERP s.a. (<http://openerp.com>). -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# -############################################################################## - -""" - -WSGI entry point with Proxy mode (from Werkzeug). - -""" - -from werkzeug.contrib.fixers import ProxyFix - -from . import core - -application = ProxyFix(core.application) - -# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
_______________________________________________ Mailing list: https://launchpad.net/~openerp-dev-gtk Post to : [email protected] Unsubscribe : https://launchpad.net/~openerp-dev-gtk More help : https://help.launchpad.net/ListHelp

