Vo Minh Thu (OpenERP) has proposed merging 
lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into 
lp:openobject-server/6.1.

Requested reviews:
  OpenERP Core Team (openerp)

For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt/+merge/99897
-- 
https://code.launchpad.net/~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt/+merge/99897
Your team OpenERP R&D Team is subscribed to branch 
lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt.
=== added file 'openerp-cron-worker'
--- openerp-cron-worker	1970-01-01 00:00:00 +0000
+++ openerp-cron-worker	2012-03-30 10:36:19 +0000
@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+OpenERP cron jobs worker
+
+This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
+OpenERP server but depending on deployment needs, independent worker processes
+can be used. This is especially the case when the server is run via Gunicorn.
+
+OpenERP cron jobs worker re-uses openerp-server command-line options but does
+not honor all of them.
+
+Meaningful options include:
+
+  -d, --database  comma-separated list of databases to monitor for cron jobs
+                  processing. If left empty, the worker monitors all databases
+                  (given by `psql -ls`).
+
+  --addons-path   as ususal.
+
+  --cpu-time-limte
+  --virtual-memory-limit
+  --virtual-memory-reset  Those three options have the same meaning the for
+                          the server with Gunicorn. The only catch is: To
+                          not enable rlimits by default, those options are
+                          honored only when --cpu-time-limte is different than
+                          60 (its default value).
+"""
+
+import logging
+import os
+import signal
+import sys
+
+import openerp
+
+# Also use the `openerp` logger for the main script.
+_logger = logging.getLogger('openerp')
+
+# Variable keeping track of the number of calls to the signal handler defined
+# below. This variable is monitored by ``quit_on_signals()``.
+quit_signals_received = 0
+
+# TODO copy/pasted from openerp-server
+def signal_handler(sig, frame):
+    """ Signal handler: exit ungracefully on the second handled signal.
+
+    :param sig: the signal number
+    :param frame: the interrupted stack frame or None
+    """
+    global quit_signals_received
+    quit_signals_received += 1
+    import openerp.addons.base
+    openerp.addons.base.ir.ir_cron.quit_signal_received = True
+    if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress:
+        _logger.info("Waiting for the current job to complete.")
+        print "Waiting for the current job to complete."
+        print "Hit Ctrl-C again to force shutdown."
+    if quit_signals_received > 1:
+        # logging.shutdown was already called at this point.
+        sys.stderr.write("Forced shutdown.\n")
+        os._exit(0)
+
+# TODO copy/pasted from openerp-server
+def setup_signal_handlers():
+    """ Register the signal handler defined above. """
+    SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split())
+    if os.name == 'posix':
+        map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
+    elif os.name == 'nt':
+        import win32api
+        win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
+
+def list_databases():
+    import subprocess
+    p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE)
+    p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE)
+    p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
+    output = p2.communicate()[0]
+    databases = output.splitlines()
+    # TODO filter out non-OpenERP databases
+    databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']]
+    databases = [d for d in databases if not d.startswith('postgres')]
+    return databases
+
+if __name__ == '__main__':
+    os.environ['TZ'] = 'UTC'
+    openerp.tools.config.parse_config(sys.argv[1:])
+    config = openerp.tools.config
+    if config['log_handler'] == [':INFO']:
+        # Replace the default value, which is suitable for openerp-server.
+        config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
+    setup_signal_handlers()
+    openerp.modules.module.initialize_sys_path()
+    openerp.modules.loading.open_openerp_namespace()
+    openerp.netsvc.init_logger()
+    openerp.cron.enable_schedule_wakeup = False
+    import openerp.addons.base
+    print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
+    print "Documentation is available at the top of the `opener-cron-worker` file."
+    if config['db_name']:
+        db_names = config['db_name'].split(',')
+        print "Monitoring %s databases." % len(db_names)
+    else:
+        db_names = list_databases
+        print "Monitored databases are auto-discovered."
+    openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

=== modified file 'openerp/__init__.py'
--- openerp/__init__.py	2012-02-08 14:28:34 +0000
+++ openerp/__init__.py	2012-03-30 10:36:19 +0000
@@ -27,6 +27,7 @@
 
 import addons
 import conf
+import cron
 import loglevels
 import modules
 import netsvc

=== modified file 'openerp/addons/base/ir/ir_cron.py'
--- openerp/addons/base/ir/ir_cron.py	2012-02-02 09:21:05 +0000
+++ openerp/addons/base/ir/ir_cron.py	2012-03-30 10:36:19 +0000
@@ -39,6 +39,14 @@
 
 _logger = logging.getLogger(__name__)
 
+# This variable can be set by a signal handler to stop the infinite loop in
+# ir_cron._run()
+quit_signal_received = False
+
+# This variable can be checked to know if ir_cron._run() is processing a job or
+# sleeping.
+job_in_progress = True
+
 def str2tuple(s):
     return eval('tuple(%s)' % (s or ''))
 
@@ -266,6 +274,151 @@
             cr.commit()
             cr.close()
 
+    def _process_job(self, cr, job):
+        """ Run a given job taking care of the repetition.
+
+        The cursor has a lock on the job (aquired by _acquire_job()).
+
+        :param job: job to be run (as a dictionary).
+        """
+        try:
+            now = datetime.now() 
+            nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
+            numbercall = job['numbercall']
+
+            ok = False
+            while nextcall < now and numbercall:
+                if numbercall > 0:
+                    numbercall -= 1
+                if not ok or job['doall']:
+                    self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+                if numbercall:
+                    nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+                ok = True
+            addsql = ''
+            if not numbercall:
+                addsql = ', active=False'
+            cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
+                       (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
+
+        finally:
+            cr.commit()
+            cr.close()
+
+    @classmethod
+    def _acquire_job(cls, db_name):
+        # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+        """ Try to process one cron job.
+
+        This selects in database all the jobs that should be processed. It then
+        tries to lock each of them and, if it succeeds, run the cron job (if it
+        doesn't succeed, it means the job was already locked to be taken care
+        of by another thread) and return.
+
+        If a job was processed, returns True, otherwise returns False.
+        """
+        db = openerp.sql_db.db_connect(db_name)
+        cr = db.cursor()
+        try:
+            # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
+            cr.execute("""SELECT * FROM ir_cron
+                          WHERE numbercall != 0
+                              AND active AND nextcall <= (now() at time zone 'UTC')
+                          ORDER BY priority""")
+            for job in cr.dictfetchall():
+                task_cr = db.cursor()
+                try:
+                    # Try to grab an exclusive lock on the job row from within the task transaction
+                    acquired_lock = False
+                    task_cr.execute("""SELECT *
+                                       FROM ir_cron
+                                       WHERE id=%s
+                                       FOR UPDATE NOWAIT""",
+                                   (job['id'],), log_exceptions=False)
+                    acquired_lock = True
+                except psycopg2.OperationalError, e:
+                    if e.pgcode == '55P03':
+                        # Class 55: Object not in prerequisite state; 55P03: lock_not_available
+                        _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
+                        continue
+                    else:
+                        # Unexpected OperationalError
+                        raise
+                finally:
+                    if not acquired_lock:
+                        # we're exiting due to an exception while acquiring the lot
+                        task_cr.close()
+
+                # Got the lock on the job row, run its code
+                _logger.debug('Starting job `%s`.', job['name'])
+                registry = openerp.pooler.get_pool(db_name)
+                registry[cls._name]._process_job(task_cr, job)
+                return True
+
+        except psycopg2.ProgrammingError, e:
+            if e.pgcode == '42P01':
+                # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
+                # The table ir_cron does not exist; this is probably not an OpenERP database.
+                _logger.warning('Tried to poll an undefined table on database %s.', db_name)
+            else:
+                raise
+        except Exception, ex:
+            _logger.warning('Exception in cron:', exc_info=True)
+
+        finally:
+            cr.commit()
+            cr.close()
+
+        return False
+
+    @classmethod
+    def _run(cls, db_names):
+        """
+        Class method intended to be run in a dedicated process to handle jobs.
+        This polls the database for jobs that can be run every 60 seconds.
+
+        :param db_names: list of database names to poll or callable to
+            generate such a list.
+        """
+        global quit_signal_received
+        while not quit_signal_received:
+            if callable(db_names):
+                names = db_names()
+            else:
+                names = db_names
+            for x in xrange(5):
+                if quit_signal_received:
+                    return
+                t1 = time.time()
+                for db_name in names:
+                    while True:
+                        # Small hack to re-use the openerp-server config:
+                        # If the cpu_time_limit has not its default value, we
+                        # truly want to establish limits.
+                        if openerp.tools.config['cpu_time_limit'] != 60:
+                            openerp.wsgi.core.pre_request('dummy', 'dummy')
+                        acquired = cls._acquire_job(db_name)
+                        if openerp.tools.config['cpu_time_limit'] != 60:
+                            class W(object):
+                                alive = True
+                            worker = W()
+                            openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
+                            if not worker.alive:
+                                return
+                        if not acquired:
+                            break
+                        if quit_signal_received:
+                            return
+                t2 = time.time()
+                t = t2 - t1
+                global job_in_progress
+                if t > 60:
+                    _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
+                else:
+                    job_in_progress = False
+                    time.sleep(60 - t)
+                    job_in_progress = True
+
     def update_running_cron(self, cr):
         """ Schedule as soon as possible a wake-up for this database. """
         # Verify whether the server is already started and thus whether we need to commit

=== modified file 'openerp/cron.py'
--- openerp/cron.py	2012-02-13 11:53:54 +0000
+++ openerp/cron.py	2012-03-30 10:36:19 +0000
@@ -49,6 +49,13 @@
 
 _logger = logging.getLogger(__name__)
 
+# Scheduling wake-ups (see below) can be disabled when the polling process
+# workers are used instead of the managed thread workers. (I.e. wake-ups are
+# not used since polling is used. And polling is used when the cron are
+# handled by running special processes, e.g. openerp-cron-worker, instead
+# of the general openerp-server script.)
+enable_schedule_wakeup = True
+
 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
 # the context of the cron management. This is not originally about loading
 # a database, although having the database name in the queue will
@@ -135,7 +142,6 @@
         _wakeups = []
         _wakeup_by_db = {}
 
-
 def schedule_wakeup(timestamp, db_name):
     """ Schedule a new wake-up for a database.
 
@@ -147,6 +153,9 @@
     :param timestamp: when the wake-up is scheduled.
 
     """
+    global enable_schedule_wakeup
+    if not enable_schedule_wakeup:
+        return
     if not timestamp:
         return
     with _wakeups_lock:

=== modified file 'openerp/wsgi/core.py'
--- openerp/wsgi/core.py	2012-03-05 16:37:30 +0000
+++ openerp/wsgi/core.py	2012-03-30 10:36:19 +0000
@@ -506,7 +506,7 @@
     rss, vms = psutil.Process(os.getpid()).get_memory_info()
     if vms > config['virtual_memory_reset']:
         _logger.info('Virtual memory consumption '
-            'too high, rebooting the worker.')
+            'too high, killing the worker.')
         worker.alive = False # Commit suicide after the request.
 
 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.

_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help   : https://help.launchpad.net/ListHelp

Reply via email to