Vo Minh Thu (OpenERP) has proposed merging
lp:~openerp-dev/openobject-server/trunk-threaded-cron-vmt into
lp:openobject-server.
Requested reviews:
OpenERP Core Team (openerp)
For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-threaded-cron-vmt/+merge/68064
--
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-threaded-cron-vmt/+merge/68064
Your team OpenERP R&D Team is subscribed to branch
lp:~openerp-dev/openobject-server/trunk-threaded-cron-vmt.
=== modified file 'openerp-server'
--- openerp-server 2011-07-06 13:44:52 +0000
+++ openerp-server 2011-07-15 09:46:56 +0000
@@ -100,7 +100,7 @@
if config['db_name']:
for dbname in config['db_name'].split(','):
- db, pool = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
+ db, registry = openerp.pooler.get_db_and_pool(dbname, update_module=config['init'] or config['update'], pooljobs=False)
cr = db.cursor()
if config["test_file"]:
@@ -108,7 +108,8 @@
openerp.tools.convert_yaml_import(cr, 'base', file(config["test_file"]), {}, 'test', True)
cr.rollback()
- pool.get('ir.cron').restart(db.dbname)
+ # jobs will start to be processed later, when openerp.cron.start_master_thread below is called.
+ registry.schedule_cron_jobs()
cr.close()
@@ -152,7 +153,7 @@
if config["stop_after_init"]:
sys.exit(0)
-openerp.netsvc.start_agent()
+openerp.cron.start_master_thread()
#----------------------------------------------------------
# Launch Servers
@@ -198,7 +199,8 @@
signal.signal(signal.SIGQUIT, dumpstacks)
def quit():
- openerp.netsvc.Agent.quit()
+ # stop scheduling new jobs; we will have to wait for the jobs to complete below
+ openerp.cron.cancel_all()
openerp.netsvc.Server.quitAll()
if config['pidfile']:
os.unlink(config['pidfile'])
@@ -214,9 +216,10 @@
if thread != threading.currentThread() and not thread.isDaemon():
while thread.isAlive():
# need a busyloop here as thread.join() masks signals
- # and would present the forced shutdown
+ # and would prevent the forced shutdown
thread.join(0.05)
time.sleep(0.05)
+ openerp.modules.registry.RegistryManager.delete_all()
sys.exit(0)
if config['pidfile']:
=== modified file 'openerp/addons/base/ir/ir_cron.py'
--- openerp/addons/base/ir/ir_cron.py 2011-06-28 15:31:48 +0000
+++ openerp/addons/base/ir/ir_cron.py 2011-07-15 09:46:56 +0000
@@ -21,6 +21,8 @@
import time
import logging
+import threading
+import psycopg2
from datetime import datetime
from dateutil.relativedelta import relativedelta
import netsvc
@@ -28,6 +30,7 @@
from tools.safe_eval import safe_eval as eval
import pooler
from osv import fields, osv
+import openerp
def str2tuple(s):
return eval('tuple(%s)' % (s or ''))
@@ -41,9 +44,23 @@
'minutes': lambda interval: relativedelta(minutes=interval),
}
-class ir_cron(osv.osv, netsvc.Agent):
+JOB = {
+ 'function': u'f',
+ 'interval_type': u'minutes',
+ 'user_id': 1,
+ 'name': u'test',
+ 'args': False,
+ 'numbercall': 1,
+ 'nextcall': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
+ 'priority': 5,
+ 'doall': True,
+ 'active': True,
+ 'interval_number': 1,
+ 'model': u'ir.cron'
+}
+
+class ir_cron(osv.osv):
""" This is the ORM object that periodically executes actions.
- Note that we use the netsvc.Agent()._logger member.
"""
_name = "ir.cron"
_order = 'name'
@@ -74,6 +91,21 @@
'doall' : lambda *a: 1
}
+ _logger = logging.getLogger('cron')
+
+ def f(a, b, c):
+ print ">>> in f"
+
+ def expensive(a, b, c):
+ print ">>> in expensive"
+ time.sleep(80)
+ print ">>> out expensive"
+
+ def expensive_2(a, b, c):
+ print ">>> in expensive_2"
+ time.sleep(30)
+ print ">>> out expensive_2"
+
def _check_args(self, cr, uid, ids, context=None):
try:
for this in self.browse(cr, uid, ids, context):
@@ -88,8 +120,7 @@
def _handle_callback_exception(self, cr, uid, model, func, args, job_id, job_exception):
cr.rollback()
- logger=logging.getLogger('cron')
- logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id))
+ self._logger.exception("Call of self.pool.get('%s').%s(cr, uid, *%r) failed in Job %s" % (model, func, args, job_id))
def _callback(self, cr, uid, model, func, args, job_id):
args = str2tuple(args)
@@ -109,45 +140,101 @@
except Exception, e:
self._handle_callback_exception(cr, uid, model, func, args, job_id, e)
- def _poolJobs(self, db_name, check=False):
+ def _run_job(self, cr, job, now):
+ """ Run a given job taking care of the repetition. """
try:
- db, pool = pooler.get_db_and_pool(db_name)
- except:
- return False
+ nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
+ numbercall = job['numbercall']
+
+ ok = False
+ while nextcall < now and numbercall:
+ if numbercall > 0:
+ numbercall -= 1
+ if not ok or job['doall']:
+ self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
+ if numbercall:
+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
+ ok = True
+ addsql = ''
+ if not numbercall:
+ addsql = ', active=False'
+ cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
+
+ if numbercall:
+ # Reschedule our own main cron thread if necessary.
+ # This is really needed if this job run longer that its rescheduling period.
+ print ">>> advance at", nextcall
+ nextcall = time.mktime(time.strptime(nextcall.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S'))
+ openerp.cron.schedule_in_advance(nextcall, cr.dbname)
+ finally:
+ cr.commit()
+ cr.close()
+ openerp.cron.inc_thread_count()
+
+ def _run_jobs(self):
+ # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
+ """ Process the cron jobs by spawning worker threads.
+
+ This selects in database all the jobs that should be processed. It then
+ try to lock each of them and, if it succeeds, spawn a thread to run the
+ cron job (if doesn't succeed, it means another the job was already
+ locked to be taken care of by another thread.
+
+ """
+ print ">>> _run_jobs"
+ db = self.pool.db
cr = db.cursor()
+ db_name = db.dbname
try:
- if not pool._init:
- now = datetime.now()
- cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
- for job in cr.dictfetchall():
- nextcall = datetime.strptime(job['nextcall'], '%Y-%m-%d %H:%M:%S')
- numbercall = job['numbercall']
-
- ok = False
- while nextcall < now and numbercall:
- if numbercall > 0:
- numbercall -= 1
- if not ok or job['doall']:
- self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
- if numbercall:
- nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
- ok = True
- addsql = ''
- if not numbercall:
- addsql = ', active=False'
- cr.execute("update ir_cron set nextcall=%s, numbercall=%s"+addsql+" where id=%s", (nextcall.strftime('%Y-%m-%d %H:%M:%S'), numbercall, job['id']))
- cr.commit()
-
-
- cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
+ jobs = {} # mapping job ids to jobs for all jobs being processed.
+ now = datetime.now()
+ cr.execute('select * from ir_cron where numbercall<>0 and active and nextcall<=now() order by priority')
+ for job in cr.dictfetchall():
+ print ">>>", openerp.cron.get_thread_count(), "threads"
+ if not openerp.cron.get_thread_count():
+ break
+ task_cr = db.cursor()
+ task_job = None
+ jobs[job['id']] = job
+
+ try:
+ # Try to lock the job...
+ task_cr.execute('select * from ir_cron where id=%s for update nowait', (job['id'],), log_exceptions=False)
+ task_job = task_cr.dictfetchall()[0]
+ except psycopg2.OperationalError, e:
+ if e.pgcode == '55P03':
+ # Class 55: Object not in prerequisite state, 55P03: lock_not_available
+ # ... and fail.
+ print ">>>", job['name'], " is already being processed"
+ continue
+ else:
+ raise
+ finally:
+ if not task_job:
+ task_cr.close()
+
+ # ... and succeed.
+ print ">>> taking care of", job['name']
+ task_thread = threading.Thread(target=self._run_job, name=task_job['name'], args=(task_cr, task_job, now))
+ # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
+ task_thread.setDaemon(False)
+ openerp.cron.dec_thread_count()
+ task_thread.start()
+
+ # Wake up time, without considering the currently processed jobs.
+ if jobs.keys():
+ cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active and id not in %s', (tuple(jobs.keys()),))
+ else:
+ cr.execute('select min(nextcall) as min_next_call from ir_cron where numbercall<>0 and active')
next_call = cr.dictfetchone()['min_next_call']
+ print ">>> possibility at ", next_call
+
if next_call:
next_call = time.mktime(time.strptime(next_call, '%Y-%m-%d %H:%M:%S'))
else:
next_call = int(time.time()) + 3600 # if do not find active cron job from database, it will run again after 1 day
- if not check:
- self.setAlarm(self._poolJobs, next_call, db_name, db_name)
+ openerp.cron.schedule_in_advance(next_call, db_name)
except Exception, ex:
self._logger.warning('Exception in cron:', exc_info=True)
@@ -156,11 +243,6 @@
cr.commit()
cr.close()
- def restart(self, dbname):
- self.cancel(dbname)
- # Reschedule cron processing job asap, but not in the current thread
- self.setAlarm(self._poolJobs, time.time(), dbname, dbname)
-
def update_running_cron(self, cr):
# Verify whether the server is already started and thus whether we need to commit
# immediately our changes and restart the cron agent in order to apply the change
@@ -171,7 +253,45 @@
# when the server is only starting or loading modules (hence the test on pool._init).
if not self.pool._init:
cr.commit()
- self.restart(cr.dbname)
+ openerp.cron.schedule_in_advance(1, self.pool.db.dbname)
+
+ def _20_seconds(self, cr, uid):
+ print ">>> in _20_seconds"
+ time.sleep(20)
+ print ">>> out _20_seconds"
+
+ def _80_seconds(self, cr, uid):
+ print ">>> in _80_seconds"
+ time.sleep(80)
+ print ">>> out _80_seconds"
+
+ def test_0(self, cr, uid):
+ now = datetime.now()
+ t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+ t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
+ t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
+ self.create(cr, uid, dict(JOB, name='test_0 _20_seconds A', function='_20_seconds', nextcall=t1))
+ self.create(cr, uid, dict(JOB, name='test_0 _20_seconds B', function='_20_seconds', nextcall=t2))
+ self.create(cr, uid, dict(JOB, name='test_0 _20_seconds C', function='_20_seconds', nextcall=t3))
+
+ def test_1(self, cr, uid):
+ now = datetime.now()
+ t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+ self.create(cr, uid, dict(JOB, name='test_1 _20_seconds * 3', function='_20_seconds', nextcall=t1, numbercall=3))
+
+ def test_2(self, cr, uid):
+ now = datetime.now()
+ t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+ self.create(cr, uid, dict(JOB, name='test_2 _80_seconds * 2', function='_80_seconds', nextcall=t1, numbercall=2))
+
+ def test_3(self, cr, uid):
+ now = datetime.now()
+ t1 = (now + relativedelta(minutes=1)).strftime('%Y-%m-%d %H:%M:%S')
+ t2 = (now + relativedelta(minutes=1, seconds=5)).strftime('%Y-%m-%d %H:%M:%S')
+ t3 = (now + relativedelta(minutes=1, seconds=10)).strftime('%Y-%m-%d %H:%M:%S')
+ self.create(cr, uid, dict(JOB, name='test_3 _80_seconds A', function='_80_seconds', nextcall=t1))
+ self.create(cr, uid, dict(JOB, name='test_3 _20_seconds B', function='_20_seconds', nextcall=t2))
+ self.create(cr, uid, dict(JOB, name='test_3 _20_seconds C', function='_20_seconds', nextcall=t3))
def create(self, cr, uid, vals, context=None):
res = super(ir_cron, self).create(cr, uid, vals, context=context)
=== modified file 'openerp/modules/registry.py'
--- openerp/modules/registry.py 2011-06-15 16:01:23 +0000
+++ openerp/modules/registry.py 2011-07-15 09:46:56 +0000
@@ -25,6 +25,8 @@
import openerp.sql_db
import openerp.osv.orm
+import openerp.cron
+import openerp.tools
class Registry(object):
@@ -82,6 +84,17 @@
return res
+ def schedule_cron_jobs(self):
+ """ Make the cron thread care about this registry/database jobs.
+
+ This will initiate the cron thread to check for any pending jobs for
+ this registry/database as soon as possible. Then it will continously
+ monitors the ir.cron model for future jobs. See openerp.cron for
+ details.
+
+ """
+ openerp.cron.schedule_in_advance(1, self.db.dbname)
+
class RegistryManager(object):
""" Model registries manager.
@@ -95,7 +108,6 @@
# Accessed through the methods below.
registries = {}
-
@classmethod
def get(cls, db_name, force_demo=False, status=None, update_module=False,
pooljobs=True):
@@ -108,7 +120,6 @@
update_module, pooljobs)
return registry
-
@classmethod
def new(cls, db_name, force_demo=False, status=None,
update_module=False, pooljobs=True):
@@ -143,16 +154,32 @@
cr.close()
if pooljobs:
- registry.get('ir.cron').restart(registry.db.dbname)
+ registry.schedule_cron_jobs()
return registry
-
@classmethod
def delete(cls, db_name):
- """ Delete the registry linked to a given database. """
+ """ Delete the registry linked to a given database.
+
+ This also cleans the associated caches. For good measure this also
+ cancels the associated cron job. But please note that the cron job can
+ be running and take some time before ending, and that you should not
+ remove a registry if it can still be used by some thread. So it might
+ be necessary to call yourself openerp.cron.Agent.cancel(db_name) and
+ and join (i.e. wait for) the thread.
+
+ """
if db_name in cls.registries:
del cls.registries[db_name]
+ openerp.tools.cache.clean_caches_for_db(db_name)
+ openerp.cron.cancel(db_name)
+
+ @classmethod
+ def delete_all(cls):
+ """ Delete all the registries. """
+ for db_name in cls.registries.keys():
+ cls.delete(db_name)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
=== modified file 'openerp/netsvc.py'
--- openerp/netsvc.py 2011-07-01 23:19:56 +0000
+++ openerp/netsvc.py 2011-07-15 09:46:56 +0000
@@ -21,7 +21,6 @@
##############################################################################
import errno
-import heapq
import logging
import logging.handlers
import os
@@ -37,6 +36,7 @@
# TODO modules that import netsvc only for things from loglevels must be changed to use loglevels.
from loglevels import *
import tools
+import openerp
def close_socket(sock):
""" Closes a socket instance cleanly
@@ -244,81 +244,6 @@
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
-class Agent(object):
- """ Singleton that keeps track of cancellable tasks to run at a given
- timestamp.
-
- The tasks are characterised by:
-
- * a timestamp
- * the database on which the task run
- * the function to call
- * the arguments and keyword arguments to pass to the function
-
- Implementation details:
-
- - Tasks are stored as list, allowing the cancellation by setting
- the timestamp to 0.
- - A heapq is used to store tasks, so we don't need to sort
- tasks ourself.
- """
- __tasks = []
- __tasks_by_db = {}
- _logger = logging.getLogger('netsvc.agent')
-
- @classmethod
- def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
- task = [timestamp, db_name, function, args, kwargs]
- heapq.heappush(cls.__tasks, task)
- cls.__tasks_by_db.setdefault(db_name, []).append(task)
-
- @classmethod
- def cancel(cls, db_name):
- """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
- cls._logger.debug("Cancel timers for %s db", db_name or 'all')
- if db_name is None:
- cls.__tasks, cls.__tasks_by_db = [], {}
- else:
- if db_name in cls.__tasks_by_db:
- for task in cls.__tasks_by_db[db_name]:
- task[0] = 0
-
- @classmethod
- def quit(cls):
- cls.cancel(None)
-
- @classmethod
- def runner(cls):
- """Neverending function (intended to be ran in a dedicated thread) that
- checks every 60 seconds tasks to run. TODO: make configurable
- """
- current_thread = threading.currentThread()
- while True:
- while cls.__tasks and cls.__tasks[0][0] < time.time():
- task = heapq.heappop(cls.__tasks)
- timestamp, dbname, function, args, kwargs = task
- cls.__tasks_by_db[dbname].remove(task)
- if not timestamp:
- # null timestamp -> cancelled task
- continue
- current_thread.dbname = dbname # hack hack
- cls._logger.debug("Run %s.%s(*%s, **%s)", function.im_class.__name__, function.func_name, args, kwargs)
- delattr(current_thread, 'dbname')
- task_thread = threading.Thread(target=function, name='netsvc.Agent.task', args=args, kwargs=kwargs)
- # force non-daemon task threads (the runner thread must be daemon, and this property is inherited by default)
- task_thread.setDaemon(False)
- task_thread.start()
- time.sleep(1)
- time.sleep(60)
-
-def start_agent():
- agent_runner = threading.Thread(target=Agent.runner, name="netsvc.Agent.runner")
- # the agent runner is a typical daemon thread, that will never quit and must be
- # terminated when the main process exits - with no consequence (the processing
- # threads it spawns are not marked daemon)
- agent_runner.setDaemon(True)
- agent_runner.start()
-
import traceback
class Server:
=== modified file 'openerp/pooler.py'
--- openerp/pooler.py 2011-06-15 07:22:31 +0000
+++ openerp/pooler.py 2011-07-15 09:46:56 +0000
@@ -34,11 +34,6 @@
return registry.db, registry
-def delete_pool(db_name):
- """Delete an existing registry."""
- RegistryManager.delete(db_name)
-
-
def restart_pool(db_name, force_demo=False, status=None, update_module=False):
"""Delete an existing registry and return a database connection and a newly initialized registry."""
registry = RegistryManager.new(db_name, force_demo, status, update_module, True)
=== modified file 'openerp/service/web_services.py'
--- openerp/service/web_services.py 2011-06-23 09:04:57 +0000
+++ openerp/service/web_services.py 2011-07-15 09:46:56 +0000
@@ -161,8 +161,8 @@
raise Exception, e
def exp_drop(self, db_name):
+ openerp.modules.registry.RegistryManager.delete(db_name)
sql_db.close_db(db_name)
- openerp.netsvc.Agent.cancel(db_name)
logger = netsvc.Logger()
db = sql_db.db_connect('template1')
@@ -264,8 +264,8 @@
return True
def exp_rename(self, old_name, new_name):
+ openerp.modules.registry.RegistryManager.delete(old_name)
sql_db.close_db(old_name)
- openerp.netsvc.Agent.cancel(db_name)
logger = netsvc.Logger()
db = sql_db.db_connect('template1')
=== modified file 'openerp/sql_db.py'
--- openerp/sql_db.py 2011-07-14 10:55:52 +0000
+++ openerp/sql_db.py 2011-07-15 09:46:56 +0000
@@ -498,9 +498,8 @@
return Connection(_Pool, db_name)
def close_db(db_name):
- """ You might want to call openerp.netsvc.Agent.cancel(db_name) along this function."""
+ """ You might want to call openerp.modules.registry.RegistryManager.delete(db_name) along this function."""
_Pool.close_all(dsn(db_name))
- tools.cache.clean_caches_for_db(db_name)
ct = currentThread()
if hasattr(ct, 'dbname'):
delattr(ct, 'dbname')
_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help : https://help.launchpad.net/ListHelp