Vo Minh Thu (OpenERP) has proposed merging
lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into
lp:openobject-server/6.1.
Requested reviews:
OpenERP Core Team (openerp)
For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt/+merge/99897
--
https://code.launchpad.net/~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt/+merge/99897
Your team OpenERP R&D Team is subscribed to branch
lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt.
=== added file 'openerp-cron-worker'
--- openerp-cron-worker 1970-01-01 00:00:00 +0000
+++ openerp-cron-worker 2012-03-30 10:36:19 +0000
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+OpenERP cron jobs worker
+
+This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
+OpenERP server but depending on deployment needs, independent worker processes
+can be used. This is especially the case when the server is run via Gunicorn.
+
+OpenERP cron jobs worker re-uses openerp-server command-line options but does
+not honor all of them.
+
+Meaningful options include:
+
+ -d, --database comma-separated list of databases to monitor for cron jobs
+ processing. If left empty, the worker monitors all databases
+ (given by `psql -ls`).
+
+ --addons-path as ususal.
+
+ --cpu-time-limte
+ --virtual-memory-limit
+ --virtual-memory-reset Those three options have the same meaning the for
+ the server with Gunicorn. The only catch is: To
+ not enable rlimits by default, those options are
+ honored only when --cpu-time-limte is different than
+ 60 (its default value).
+"""
+
+import logging
+import os
+import signal
+import sys
+
+import openerp
+
+# Also use the `openerp` logger for the main script.
+_logger = logging.getLogger('openerp')
+
+# Variable keeping track of the number of calls to the signal handler defined
+# below. This variable is monitored by ``quit_on_signals()``.
+quit_signals_received = 0
+
+# TODO copy/pasted from openerp-server
+def signal_handler(sig, frame):
+ """ Signal handler: exit ungracefully on the second handled signal.
+
+ :param sig: the signal number
+ :param frame: the interrupted stack frame or None
+ """
+ global quit_signals_received
+ quit_signals_received += 1
+ import openerp.addons.base
+ openerp.addons.base.ir.ir_cron.quit_signal_received = True
+ if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress:
+ _logger.info("Waiting for the current job to complete.")
+ print "Waiting for the current job to complete."
+ print "Hit Ctrl-C again to force shutdown."
+ if quit_signals_received > 1:
+ # logging.shutdown was already called at this point.
+ sys.stderr.write("Forced shutdown.\n")
+ os._exit(0)
+
+# TODO copy/pasted from openerp-server
+def setup_signal_handlers():
+ """ Register the signal handler defined above. """
+ SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split())
+ if os.name == 'posix':
+ map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
+ elif os.name == 'nt':
+ import win32api
+ win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
+
+def list_databases():
+ import subprocess
+ p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE)
+ p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
+ output = p2.communicate()[0]
+ databases = output.splitlines()
+ # TODO filter out non-OpenERP databases
+ databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']]
+ databases = [d for d in databases if not d.startswith('postgres')]
+ return databases
+
+if __name__ == '__main__':
+ os.environ['TZ'] = 'UTC'
+ openerp.tools.config.parse_config(sys.argv[1:])
+ config = openerp.tools.config
+ if config['log_handler'] == [':INFO']:
+ # Replace the default value, which is suitable for openerp-server.
+ config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
+ setup_signal_handlers()
+ openerp.modules.module.initialize_sys_path()
+ openerp.modules.loading.open_openerp_namespace()
+ openerp.netsvc.init_logger()
+ openerp.cron.enable_schedule_wakeup = False
+ import openerp.addons.base
+ print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
+ print "Documentation is available at the top of the `opener-cron-worker` file."
+ if config['db_name']:
+ db_names = config['db_name'].split(',')
+ print "Monitoring %s databases." % len(db_names)
+ else:
+ db_names = list_databases
+ print "Monitored databases are auto-discovered."
+ openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
=== modified file 'openerp/__init__.py'
--- openerp/__init__.py 2012-02-08 14:28:34 +0000
+++ openerp/__init__.py 2012-03-30 10:36:19 +0000
@@ -27,6 +27,7 @@
import addons
import conf
+import cron
import loglevels
import modules
import netsvc
=== modified file 'openerp/addons/base/ir/ir_cron.py'
--- openerp/addons/base/ir/ir_cron.py 2012-02-02 09:21:05 +0000
+++ openerp/addons/base/ir/ir_cron.py 2012-03-30 10:36:19 +0000
@@ -39,6 +39,14 @@
_logger = logging.getLogger(__name__)
+# This variable can be set by a signal handler to stop the infinite loop in
+# ir_cron._run()
+quit_signal_received = False
+
+# This variable can be checked to know if ir_cron._run() is processing a job or
+# sleeping.
+job_in_progress = True
+
def str2tuple(s):
return eval('tuple(%s)' % (s or ''))
@@ -266,6 +274,151 @@
cr.commit()
cr.close()
+ def _process_job(self, cr, job):
+ """ Run a given job taking care of the repetition.
+
+ The cursor has a lock on the job (aquired by _acquire_job()).
+
+ :param job: job to be run (as a dictionary).
+ """
+ try:
+ now = datetime.now()
+ nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
+ numbercall = job['numbercall']
+
+ ok = False
+ while nextcall < now and numbercall:
+ if numbercall > 0:
+ numbercall -= 1
+ if not ok or job['doall']:
+ self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+ if numbercall:
+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+ ok = True
+ addsql = ''
+ if not numbercall:
+ addsql = ', active=False'
+ cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
+ (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
+
+ finally:
+ cr.commit()
+ cr.close()
+
+ @classmethod
+ def _acquire_job(cls, db_name):
+ # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+ """ Try to process one cron job.
+
+ This selects in database all the jobs that should be processed. It then
+ tries to lock each of them and, if it succeeds, run the cron job (if it
+ doesn't succeed, it means the job was already locked to be taken care
+ of by another thread) and return.
+
+ If a job was processed, returns True, otherwise returns False.
+ """
+ db = openerp.sql_db.db_connect(db_name)
+ cr = db.cursor()
+ try:
+ # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
+ cr.execute("""SELECT * FROM ir_cron
+ WHERE numbercall != 0
+ AND active AND nextcall <= (now() at time zone 'UTC')
+ ORDER BY priority""")
+ for job in cr.dictfetchall():
+ task_cr = db.cursor()
+ try:
+ # Try to grab an exclusive lock on the job row from within the task transaction
+ acquired_lock = False
+ task_cr.execute("""SELECT *
+ FROM ir_cron
+ WHERE id=%s
+ FOR UPDATE NOWAIT""",
+ (job['id'],), log_exceptions=False)
+ acquired_lock = True
+ except psycopg2.OperationalError, e:
+ if e.pgcode == '55P03':
+ # Class 55: Object not in prerequisite state; 55P03: lock_not_available
+ _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
+ continue
+ else:
+ # Unexpected OperationalError
+ raise
+ finally:
+ if not acquired_lock:
+ # we're exiting due to an exception while acquiring the lot
+ task_cr.close()
+
+ # Got the lock on the job row, run its code
+ _logger.debug('Starting job `%s`.', job['name'])
+ registry = openerp.pooler.get_pool(db_name)
+ registry[cls._name]._process_job(task_cr, job)
+ return True
+
+ except psycopg2.ProgrammingError, e:
+ if e.pgcode == '42P01':
+ # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
+ # The table ir_cron does not exist; this is probably not an OpenERP database.
+ _logger.warning('Tried to poll an undefined table on database %s.', db_name)
+ else:
+ raise
+ except Exception, ex:
+ _logger.warning('Exception in cron:', exc_info=True)
+
+ finally:
+ cr.commit()
+ cr.close()
+
+ return False
+
+ @classmethod
+ def _run(cls, db_names):
+ """
+ Class method intended to be run in a dedicated process to handle jobs.
+ This polls the database for jobs that can be run every 60 seconds.
+
+ :param db_names: list of database names to poll or callable to
+ generate such a list.
+ """
+ global quit_signal_received
+ while not quit_signal_received:
+ if callable(db_names):
+ names = db_names()
+ else:
+ names = db_names
+ for x in xrange(5):
+ if quit_signal_received:
+ return
+ t1 = time.time()
+ for db_name in names:
+ while True:
+ # Small hack to re-use the openerp-server config:
+ # If the cpu_time_limit has not its default value, we
+ # truly want to establish limits.
+ if openerp.tools.config['cpu_time_limit'] != 60:
+ openerp.wsgi.core.pre_request('dummy', 'dummy')
+ acquired = cls._acquire_job(db_name)
+ if openerp.tools.config['cpu_time_limit'] != 60:
+ class W(object):
+ alive = True
+ worker = W()
+ openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
+ if not worker.alive:
+ return
+ if not acquired:
+ break
+ if quit_signal_received:
+ return
+ t2 = time.time()
+ t = t2 - t1
+ global job_in_progress
+ if t > 60:
+ _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
+ else:
+ job_in_progress = False
+ time.sleep(60 - t)
+ job_in_progress = True
+
def update_running_cron(self, cr):
""" Schedule as soon as possible a wake-up for this database. """
# Verify whether the server is already started and thus whether we need to commit
=== modified file 'openerp/cron.py'
--- openerp/cron.py 2012-02-13 11:53:54 +0000
+++ openerp/cron.py 2012-03-30 10:36:19 +0000
@@ -49,6 +49,13 @@
_logger = logging.getLogger(__name__)
+# Scheduling wake-ups (see below) can be disabled when the polling process
+# workers are used instead of the managed thread workers. (I.e. wake-ups are
+# not used since polling is used. And polling is used when the cron are
+# handled by running special processes, e.g. openerp-cron-worker, instead
+# of the general openerp-server script.)
+enable_schedule_wakeup = True
+
# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
# the context of the cron management. This is not originally about loading
# a database, although having the database name in the queue will
@@ -135,7 +142,6 @@
_wakeups = []
_wakeup_by_db = {}
-
def schedule_wakeup(timestamp, db_name):
""" Schedule a new wake-up for a database.
@@ -147,6 +153,9 @@
:param timestamp: when the wake-up is scheduled.
"""
+ global enable_schedule_wakeup
+ if not enable_schedule_wakeup:
+ return
if not timestamp:
return
with _wakeups_lock:
=== modified file 'openerp/wsgi/core.py'
--- openerp/wsgi/core.py 2012-03-05 16:37:30 +0000
+++ openerp/wsgi/core.py 2012-03-30 10:36:19 +0000
@@ -506,7 +506,7 @@
rss, vms = psutil.Process(os.getpid()).get_memory_info()
if vms > config['virtual_memory_reset']:
_logger.info('Virtual memory consumption '
- 'too high, rebooting the worker.')
+ 'too high, killing the worker.')
worker.alive = False # Commit suicide after the request.
# SIGXCPU (exceeded CPU time) signal handler will raise an exception.
_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help : https://help.launchpad.net/ListHelp