Vo Minh Thu (OpenERP) has proposed merging 
lp:~openerp-dev/openobject-server/trunk-threaded-cron-vmt into 
lp:openobject-server.

Requested reviews:
  OpenERP Core Team (openerp)

For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-threaded-cron-vmt/+merge/68064
-- 
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-threaded-cron-vmt/+merge/68064
Your team OpenERP R&D Team is subscribed to branch 
lp:~openerp-dev/openobject-server/trunk-threaded-cron-vmt.
=== modified file 'openerp-server'
--- openerp-server	2011-07-06 13:44:52 +0000
+++ openerp-server	2011-07-15 09:46:56 +0000
@@ -100,7 +100,7 @@
 
 if config['db_name']:
     for dbname in config['db_name'].split(','):
-        db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
+        db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
         cr = db.cursor()
 
         if config["test_file"]:
@@ -108,7 +108,8 @@
             openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True)
             cr.rollback()
 
-        pool.get('ir.cron').restart(db.dbname)
+        # jobs will start to be processed later, when openerp.cron.start_master_thread below is called.
+        registry.schedule_cron_jobs()
 
         cr.close()
 
@@ -152,7 +153,7 @@
 if config["stop_after_init"]:
     sys.exit(0)
 
-openerp.netsvc.start_agent()
+openerp.cron.start_master_thread()
 
 #----------------------------------------------------------
 # Launch Servers
@@ -198,7 +199,8 @@
     signal.signal(signal.SIGQUIT, dumpstacks)
 
 def quit():
-    openerp.netsvc.Agent.quit()
+    # stop scheduling new jobs; we will have to wait for the jobs to complete below
+    openerp.cron.cancel_all()
     openerp.netsvc.Server.quitAll()
     if config['pidfile']:
         os.unlink(config['pidfile'])
@@ -214,9 +216,10 @@
         if thread != threading.currentThread() and not thread.isDaemon():
             while thread.isAlive():
                 # need a busyloop here as thread.join() masks signals
-                # and would present the forced shutdown
+                # and would prevent the forced shutdown
                 thread.join(0.05)
                 time.sleep(0.05)
+    openerp.modules.registry.RegistryManager.delete_all()
     sys.exit(0)
 
 if config['pidfile']:

=== modified file 'openerp/addons/base/ir/ir_cron.py'
--- openerp/addons/base/ir/ir_cron.py	2011-06-28 15:31:48 +0000
+++ openerp/addons/base/ir/ir_cron.py	2011-07-15 09:46:56 +0000
@@ -21,6 +21,8 @@
 
 import time
 import logging
+import threading
+import psycopg2
 from datetime import datetime
 from dateutil.relativedelta import relativedelta
 import netsvc
@@ -28,6 +30,7 @@
 from tools.safe_eval import safe_eval as eval
 import pooler
 from osv import fields, osv
+import openerp
 
 def str2tuple(s):
     return eval('tuple(%s)' % (s or ''))
@@ -41,9 +44,23 @@
     'minutes': lambda interval: relativedelta(minutes=interval),
 }
 
-class ir_cron(osv.osv, netsvc.Agent):
+JOB = {
+    'function': u'f',
+    'interval_type': u'minutes',
+    'user_id': 1,
+    'name': u'test',
+    'args': False,
+    'numbercall': 1,
+    'nextcall': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
+    'priority': 5,
+    'doall': True,
+    'active': True,
+    'interval_number': 1,
+    'model': u'ir.cron'
+}
+
+class ir_cron(osv.osv):
     """ This is the ORM object that periodically executes actions.
-        Note that we use the netsvc.Agent()._logger member.
     """
     _name = "ir.cron"
     _order = 'name'
@@ -74,6 +91,21 @@
         'doall' : lambda *a: 1
     }
 
+    _logger = logging.getLogger('cron')
+
+    def f(a, b, c):
+        print ">>> in f"
+
+    def expensive(a, b, c):
+        print ">>> in expensive"
+        time.sleep(80)
+        print ">>> out expensive"
+
+    def expensive_2(a, b, c):
+        print ">>> in expensive_2"
+        time.sleep(30)
+        print ">>> out expensive_2"
+
     def _check_args(self, cr, uid, ids, context=None):
         try:
             for this in self.browse(cr, uid, ids, context):
@@ -88,8 +120,7 @@
 
     def _handle_callback_exception(self, cr, uid, model, func, args, job_id, job_exception):
         cr.rollback()
-        logger=logging.getLogger('cron')
-        logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id))
+        self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id))
 
     def _callback(self, cr, uid, model, func, args, job_id):
         args = str2tuple(args)
@@ -109,45 +140,101 @@
             except Exception, e:
                 self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
 
-    def _poolJobs(self, db_name, check=False):
+    def _run_job(self, cr, job, now):
+        """ Run a given job taking care of the repetition. """
         try:
-            db, pool = pooler.get_db_and_pool(db_name)
-        except:
-            return False
+            nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+            numbercall = job['numbercall']
+
+            ok = False
+            while nextcall < now and numbercall:
+                if numbercall > 0:
+                    numbercall -= 1
+                if not ok or job['doall']:
+                    self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+                if numbercall:
+                    nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+                ok = True
+            addsql = ''
+            if not numbercall:
+                addsql = ', active=False'
+            cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
+
+            if numbercall:
+                # Reschedule our own main cron thread if necessary.
+                # This is really needed if this job run longer that its rescheduling period.
+                print ">>> advance at", nextcall
+                nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
+                openerp.cron.schedule_in_advance(nextcall, cr.dbname)
+        finally:
+            cr.commit()
+            cr.close()
+            openerp.cron.inc_thread_count()
+
+    def _run_jobs(self):
+        # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+        """ Process the cron jobs by spawning worker threads.
+
+        This selects in database all the jobs that should be processed. It then
+        try to lock each of them and, if it succeeds, spawn a thread to run the
+        cron job (if doesn't succeed, it means another the job was already
+        locked to be taken care of by another thread.
+
+        """
+        print ">>> _run_jobs"
+        db = self.pool.db
         cr = db.cursor()
+        db_name = db.dbname
         try:
-            if not pool._init:
-                now = datetime.now()
-                cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
-                for job in cr.dictfetchall():
-                    nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
-                    numbercall = job['numbercall']
-
-                    ok = False
-                    while nextcall < now and numbercall:
-                        if numbercall > 0:
-                            numbercall -= 1
-                        if not ok or job['doall']:
-                            self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
-                        if numbercall:
-                            nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
-                        ok = True
-                    addsql = ''
-                    if not numbercall:
-                        addsql = ', active=False'
-                    cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
-                    cr.commit()
-
-
-            cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
+            jobs = {} # mapping job ids to jobs for all jobs being processed.
+            now = datetime.now()
+            cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
+            for job in cr.dictfetchall():
+                print ">>>", openerp.cron.get_thread_count(), "threads"
+                if not openerp.cron.get_thread_count():
+                    break
+                task_cr = db.cursor()
+                task_job = None
+                jobs[job['id']] = job
+
+                try:
+                    # Try to lock the job...
+                    task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+                    task_job = task_cr.dictfetchall()[0]
+                except psycopg2.OperationalError, e:
+                    if e.pgcode == '55P03':
+                        # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+                        # ... and fail.
+                        print ">>>", job['name'], " is already being processed"
+                        continue
+                    else:
+                        raise
+                finally:
+                    if not task_job:
+                        task_cr.close()
+
+                # ... and succeed.
+                print ">>> taking care of", job['name']
+                task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+                task_thread.setDaemon(False)
+                openerp.cron.dec_thread_count()
+                task_thread.start()
+
+            # Wake up time, without considering the currently processed jobs.
+            if jobs.keys():
+                cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
+            else:
+                cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
             next_call = cr.dictfetchone()['min_next_call']
+            print ">>> possibility at ", next_call
+
             if next_call:
                 next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
             else:
                 next_call = int(time.time()) + 3600   # if do not find active cron job from database, it will run again after 1 day
 
-            if not check:
-                self.setAlarm(self._poolJobs, next_call, db_name, db_name)
+            openerp.cron.schedule_in_advance(next_call, db_name)
 
         except Exception, ex:
             self._logger.warning('Exception in cron:', exc_info=True)
@@ -156,11 +243,6 @@
             cr.commit()
             cr.close()
 
-    def restart(self, dbname):
-        self.cancel(dbname)
-        # Reschedule cron processing job asap, but not in the current thread
-        self.setAlarm(self._poolJobs, time.time(), dbname, dbname)
-
     def update_running_cron(self, cr):
         # Verify whether the server is already started and thus whether we need to commit
         # immediately our changes and restart the cron agent in order to apply the change
@@ -171,7 +253,45 @@
         # when the server is only starting or loading modules (hence the test on pool._init).
         if not self.pool._init:
             cr.commit()
-            self.restart(cr.dbname)
+            openerp.cron.schedule_in_advance(1, self.pool.db.dbname)
+
+    def _20_seconds(self, cr, uid):
+        print ">>> in _20_seconds"
+        time.sleep(20)
+        print ">>> out _20_seconds"
+
+    def _80_seconds(self, cr, uid):
+        print ">>> in _80_seconds"
+        time.sleep(80)
+        print ">>> out _80_seconds"
+
+    def test_0(self, cr, uid):
+        now = datetime.now()
+        t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+        t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
+        t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
+        self.create(cr, uid, dict(JOB, name='test_0 _20_seconds A', function='_20_seconds', nextcall=t1))
+        self.create(cr, uid, dict(JOB, name='test_0 _20_seconds B', function='_20_seconds', nextcall=t2))
+        self.create(cr, uid, dict(JOB, name='test_0 _20_seconds C', function='_20_seconds', nextcall=t3))
+
+    def test_1(self, cr, uid):
+        now = datetime.now()
+        t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+        self.create(cr, uid, dict(JOB, name='test_1 _20_seconds * 3', function='_20_seconds', nextcall=t1, numbercall=3))
+
+    def test_2(self, cr, uid):
+        now = datetime.now()
+        t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+        self.create(cr, uid, dict(JOB, name='test_2 _80_seconds * 2', function='_80_seconds', nextcall=t1, numbercall=2))
+
+    def test_3(self, cr, uid):
+        now = datetime.now()
+        t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+        t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
+        t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
+        self.create(cr, uid, dict(JOB, name='test_3 _80_seconds A', function='_80_seconds', nextcall=t1))
+        self.create(cr, uid, dict(JOB, name='test_3 _20_seconds B', function='_20_seconds', nextcall=t2))
+        self.create(cr, uid, dict(JOB, name='test_3 _20_seconds C', function='_20_seconds', nextcall=t3))
 
     def create(self, cr, uid, vals, context=None):
         res = super(ir_cron, self).create(cr, uid, vals, context=context)

=== modified file 'openerp/modules/registry.py'
--- openerp/modules/registry.py	2011-06-15 16:01:23 +0000
+++ openerp/modules/registry.py	2011-07-15 09:46:56 +0000
@@ -25,6 +25,8 @@
 
 import openerp.sql_db
 import openerp.osv.orm
+import openerp.cron
+import openerp.tools
 
 
 class Registry(object):
@@ -82,6 +84,17 @@
 
         return res
 
+    def schedule_cron_jobs(self):
+        """ Make the cron thread care about this registry/database jobs.
+
+        This will initiate the cron thread to check for any pending jobs for
+        this registry/database as soon as possible. Then it will continously
+        monitors the ir.cron model for future jobs. See openerp.cron for
+        details.
+        
+        """
+        openerp.cron.schedule_in_advance(1, self.db.dbname)
+
 
 class RegistryManager(object):
     """ Model registries manager.
@@ -95,7 +108,6 @@
     # Accessed through the methods below.
     registries = {}
 
-
     @classmethod
     def get(cls, db_name, force_demo=False, status=None, update_module=False,
             pooljobs=True):
@@ -108,7 +120,6 @@
                 update_module, pooljobs)
         return registry
 
-
     @classmethod
     def new(cls, db_name, force_demo=False, status=None,
             update_module=False, pooljobs=True):
@@ -143,16 +154,32 @@
             cr.close()
 
         if pooljobs:
-            registry.get('ir.cron').restart(registry.db.dbname)
+            registry.schedule_cron_jobs()
 
         return registry
 
-
     @classmethod
     def delete(cls, db_name):
-        """ Delete the registry linked to a given database. """
+        """ Delete the registry linked to a given database.
+
+        This also cleans the associated caches. For good measure this also
+        cancels the associated cron job. But please note that the cron job can
+        be running and take some time before ending, and that you should not
+        remove a registry if it can still be used by some thread. So it might
+        be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
+        and join (i.e. wait for) the thread.
+
+        """
         if db_name in cls.registries:
             del cls.registries[db_name]
+        openerp.tools.cache.clean_caches_for_db(db_name)
+        openerp.cron.cancel(db_name)
+
+    @classmethod
+    def delete_all(cls):
+        """ Delete all the registries. """
+        for db_name in cls.registries.keys():
+            cls.delete(db_name)
 
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

=== modified file 'openerp/netsvc.py'
--- openerp/netsvc.py	2011-07-01 23:19:56 +0000
+++ openerp/netsvc.py	2011-07-15 09:46:56 +0000
@@ -21,7 +21,6 @@
 ##############################################################################
 
 import errno
-import heapq
 import logging
 import logging.handlers
 import os
@@ -37,6 +36,7 @@
 # TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
 from loglevels import *
 import tools
+import openerp
 
 def close_socket(sock):
     """ Closes a socket instance cleanly
@@ -244,81 +244,6 @@
     logger.addHandler(handler)
     logger.setLevel(logging.ERROR)
 
-class Agent(object):
-    """ Singleton that keeps track of cancellable tasks to run at a given
-        timestamp.
-       
-        The tasks are characterised by:
-       
-            * a timestamp
-            * the database on which the task run
-            * the function to call
-            * the arguments and keyword arguments to pass to the function
-
-        Implementation details:
-        
-          - Tasks are stored as list, allowing the cancellation by setting
-            the timestamp to 0.
-          - A heapq is used to store tasks, so we don't need to sort
-            tasks ourself.
-    """
-    __tasks = []
-    __tasks_by_db = {}
-    _logger = logging.getLogger('netsvc.agent')
-
-    @classmethod
-    def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
-        task = [timestamp, db_name, function, args, kwargs]
-        heapq.heappush(cls.__tasks, task)
-        cls.__tasks_by_db.setdefault(db_name, []).append(task)
-
-    @classmethod
-    def cancel(cls, db_name):
-        """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
-        cls._logger.debug("Cancel timers for %s db", db_name or 'all')
-        if db_name is None:
-            cls.__tasks, cls.__tasks_by_db = [], {}
-        else:
-            if db_name in cls.__tasks_by_db:
-                for task in cls.__tasks_by_db[db_name]:
-                    task[0] = 0
-
-    @classmethod
-    def quit(cls):
-        cls.cancel(None)
-
-    @classmethod
-    def runner(cls):
-        """Neverending function (intended to be ran in a dedicated thread) that
-           checks every 60 seconds tasks to run. TODO: make configurable
-        """
-        current_thread = threading.currentThread()
-        while True:
-            while cls.__tasks and cls.__tasks[0][0] < time.time():
-                task = heapq.heappop(cls.__tasks)
-                timestamp, dbname, function, args, kwargs = task
-                cls.__tasks_by_db[dbname].remove(task)
-                if not timestamp:
-                    # null timestamp -> cancelled task
-                    continue
-                current_thread.dbname = dbname   # hack hack
-                cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
-                delattr(current_thread, 'dbname')
-                task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
-                # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
-                task_thread.setDaemon(False)
-                task_thread.start()
-                time.sleep(1)
-            time.sleep(60)
-
-def start_agent():
-    agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
-    # the agent runner is a typical daemon thread, that will never quit and must be
-    # terminated when the main process exits - with no consequence (the processing
-    # threads it spawns are not marked daemon)
-    agent_runner.setDaemon(True)
-    agent_runner.start()
-
 import traceback
 
 class Server:

=== modified file 'openerp/pooler.py'
--- openerp/pooler.py	2011-06-15 07:22:31 +0000
+++ openerp/pooler.py	2011-07-15 09:46:56 +0000
@@ -34,11 +34,6 @@
     return registry.db, registry
 
 
-def delete_pool(db_name):
-    """Delete an existing registry."""
-    RegistryManager.delete(db_name)
-
-
 def restart_pool(db_name, force_demo=False, status=None, update_module=False):
     """Delete an existing registry and return a database connection and a newly initialized registry."""
     registry = RegistryManager.new(db_name, force_demo, status, update_module, True)

=== modified file 'openerp/service/web_services.py'
--- openerp/service/web_services.py	2011-06-23 09:04:57 +0000
+++ openerp/service/web_services.py	2011-07-15 09:46:56 +0000
@@ -161,8 +161,8 @@
                 raise Exception, e
 
     def exp_drop(self, db_name):
+        openerp.modules.registry.RegistryManager.delete(db_name)
         sql_db.close_db(db_name)
-        openerp.netsvc.Agent.cancel(db_name)
         logger = netsvc.Logger()
 
         db = sql_db.db_connect('template1')
@@ -264,8 +264,8 @@
         return True
 
     def exp_rename(self, old_name, new_name):
+        openerp.modules.registry.RegistryManager.delete(old_name)
         sql_db.close_db(old_name)
-        openerp.netsvc.Agent.cancel(db_name)
         logger = netsvc.Logger()
 
         db = sql_db.db_connect('template1')

=== modified file 'openerp/sql_db.py'
--- openerp/sql_db.py	2011-07-14 10:55:52 +0000
+++ openerp/sql_db.py	2011-07-15 09:46:56 +0000
@@ -498,9 +498,8 @@
     return Connection(_Pool, db_name)
 
 def close_db(db_name):
-    """ You might want to call openerp.netsvc.Agent.cancel(db_name) along this function."""
+    """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
     _Pool.close_all(dsn(db_name))
-    tools.cache.clean_caches_for_db(db_name)
     ct = currentThread()
     if hasattr(ct, 'dbname'):
         delattr(ct, 'dbname')

_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help   : https://help.launchpad.net/ListHelp

Reply via email to