Antony Lesuisse (OpenERP) has proposed merging 
lp:~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt into 
lp:openobject-server.

Requested reviews:
  OpenERP Core Team (openerp)

For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt/+merge/92150

multiprocess signaling, wip
-- 
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt/+merge/92150
Your team OpenERP R&D Team is subscribed to branch 
lp:~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt.
=== modified file 'gunicorn.conf.py'
--- gunicorn.conf.py	2012-01-20 17:20:18 +0000
+++ gunicorn.conf.py	2012-02-08 22:20:24 +0000
@@ -17,11 +17,10 @@
 # Gunicorn recommends 2-4 x number_of_cpu_cores, but
 # you'll want to vary this a bit to find the best for your
 # particular work load.
-workers = 4
+workers = 10
 
 # Some application-wide initialization is needed.
 on_starting = openerp.wsgi.on_starting
-when_ready = openerp.wsgi.when_ready
 pre_request = openerp.wsgi.pre_request
 post_request = openerp.wsgi.post_request
 
@@ -31,6 +30,8 @@
 
 max_requests = 2000
 
+#accesslog = '/tmp/blah.txt'
+
 # Equivalent of --load command-line option
 openerp.conf.server_wide_modules = ['web']
 
@@ -39,7 +40,7 @@
 
 # Path to the OpenERP Addons repository (comma-separated for
 # multiple locations)
-conf['addons_path'] = '/home/openerp/addons/trunk,/home/openerp/web/trunk/addons'
+conf['addons_path'] = '/home/thu/repos/addons/trunk,/home/thu/repos/web/trunk/addons'
 
 # Optional database config if not using local socket
 #conf['db_name'] = 'mycompany'

=== modified file 'openerp-server'
--- openerp-server	2012-02-06 20:31:51 +0000
+++ openerp-server	2012-02-08 22:20:24 +0000
@@ -245,7 +245,7 @@
             # Call any post_load hook.
             info = openerp.modules.module.load_information_from_description_file(m)
             if info['post_load']:
-                getattr(sys.modules[m], info['post_load'])()
+                getattr(sys.modules['openerp.addons.' + m], info['post_load'])()
         except Exception:
             msg = ''
             if m == 'web':

=== modified file 'openerp/__init__.py'
--- openerp/__init__.py	2011-09-27 16:51:33 +0000
+++ openerp/__init__.py	2012-02-08 22:20:24 +0000
@@ -45,5 +45,12 @@
 import workflow
 import wsgi
 
+# Is the server running in multi-process mode (e.g. behind Gunicorn).
+# If this is True, the processes have to communicate some events,
+# e.g. database update or cache invalidation. Each process has also
+# its own copy of the data structure and we don't need to care about
+# locks between threads.
+multi_process = False
+
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
 

=== modified file 'openerp/addons/base/base.sql'
--- openerp/addons/base/base.sql	2012-01-30 09:52:38 +0000
+++ openerp/addons/base/base.sql	2012-02-08 22:20:24 +0000
@@ -347,6 +347,16 @@
     res_id integer, primary key(id)
 );
 
+-- Inter-process signaling:
+-- The `base_registry_signaling` sequence indicates the whole registry
+-- must be reloaded.
+-- The `base_cache_signaling sequence` indicates all caches must be
+-- invalidated (i.e. cleared).
+CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_registry_signaling');
+CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1;
+SELECT nextval('base_cache_signaling');
+
 ---------------------------------
 -- Users
 ---------------------------------

=== modified file 'openerp/addons/base/ir/ir_ui_menu.py'
--- openerp/addons/base/ir/ir_ui_menu.py	2012-01-25 11:58:23 +0000
+++ openerp/addons/base/ir/ir_ui_menu.py	2012-02-08 22:20:24 +0000
@@ -40,69 +40,57 @@
 class ir_ui_menu(osv.osv):
     _name = 'ir.ui.menu'
 
-    def __init__(self, *args, **kwargs):
-        self.cache_lock = threading.RLock()
-        self.clear_cache()
-        r = super(ir_ui_menu, self).__init__(*args, **kwargs)
-        self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
-        return r
-
-    def clear_cache(self):
-        with self.cache_lock:
-            # radical but this doesn't frequently happen
-            self._cache = {}
-
     def _filter_visible_menus(self, cr, uid, ids, context=None):
         """Filters the give menu ids to only keep the menu items that should be
            visible in the menu hierarchy of the current user.
            Uses a cache for speeding up the computation.
         """
-        with self.cache_lock:
-            modelaccess = self.pool.get('ir.model.access')
-            user_groups = set(self.pool.get('res.users').read(cr, 1, uid, ['groups_id'])['groups_id'])
-            result = []
-            for menu in self.browse(cr, uid, ids, context=context):
-                # this key works because user access rights are all based on user's groups (cfr ir_model_access.check)
-                key = (cr.dbname, menu.id, tuple(user_groups))
-                if key in self._cache:
-                    if self._cache[key]:
-                        result.append(menu.id)
-                    #elif not menu.groups_id and not menu.action:
-                    #    result.append(menu.id)
-                    continue
-
-                self._cache[key] = False
-                if menu.groups_id:
-                    restrict_to_groups = [g.id for g in menu.groups_id]
-                    if not user_groups.intersection(restrict_to_groups):
-                        continue
-                    #result.append(menu.id)
-                    #self._cache[key] = True
-                    #continue
-
-                if menu.action:
-                    # we check if the user has access to the action of the menu
-                    data = menu.action
-                    if data:
-                        model_field = { 'ir.actions.act_window':    'res_model',
-                                        'ir.actions.report.xml':    'model',
-                                        'ir.actions.wizard':        'model',
-                                        'ir.actions.server':        'model_id',
-                                      }
-
-                        field = model_field.get(menu.action._name)
-                        if field and data[field]:
-                            if not modelaccess.check(cr, uid, data[field], 'read', False):
-                                continue
-                else:
-                    # if there is no action, it's a 'folder' menu
-                    if not menu.child_id:
-                        # not displayed if there is no children
-                        continue
-
-                result.append(menu.id)
-                self._cache[key] = True
-            return result
+        _cache = {}
+        modelaccess = self.pool.get('ir.model.access')
+        user_groups = set(self.pool.get('res.users').read(cr, 1, uid, ['groups_id'])['groups_id'])
+        result = []
+        for menu in self.browse(cr, uid, ids, context=context):
+            # this key works because user access rights are all based on user's groups (cfr ir_model_access.check)
+            key = (cr.dbname, menu.id, tuple(user_groups))
+            if key in _cache:
+                if _cache[key]:
+                    result.append(menu.id)
+                #elif not menu.groups_id and not menu.action:
+                #    result.append(menu.id)
+                continue
+
+            _cache[key] = False
+            if menu.groups_id:
+                restrict_to_groups = [g.id for g in menu.groups_id]
+                if not user_groups.intersection(restrict_to_groups):
+                    continue
+                #result.append(menu.id)
+                #_cache[key] = True
+                #continue
+
+            if menu.action:
+                # we check if the user has access to the action of the menu
+                data = menu.action
+                if data:
+                    model_field = { 'ir.actions.act_window':    'res_model',
+                                    'ir.actions.report.xml':    'model',
+                                    'ir.actions.wizard':        'model',
+                                    'ir.actions.server':        'model_id',
+                                  }
+
+                    field = model_field.get(menu.action._name)
+                    if field and data[field]:
+                        if not modelaccess.check(cr, uid, data[field], 'read', False):
+                            continue
+            else:
+                # if there is no action, it's a 'folder' menu
+                if not menu.child_id:
+                    # not displayed if there is no children
+                    continue
+
+            result.append(menu.id)
+            _cache[key] = True
+        return result
 
     def search(self, cr, uid, args, offset=0, limit=None, order=None, context=None, count=False):
         if context is None:
@@ -146,18 +134,6 @@
             parent_path = ''
         return parent_path + menu.name
 
-    def create(self, *args, **kwargs):
-        self.clear_cache()
-        return super(ir_ui_menu, self).create(*args, **kwargs)
-
-    def write(self, *args, **kwargs):
-        self.clear_cache()
-        return super(ir_ui_menu, self).write(*args, **kwargs)
-
-    def unlink(self, *args, **kwargs):
-        self.clear_cache()
-        return super(ir_ui_menu, self).unlink(*args, **kwargs)
-
     def copy(self, cr, uid, id, default=None, context=None):
         ir_values_obj = self.pool.get('ir.values')
         res = super(ir_ui_menu, self).copy(cr, uid, id, context=context)

=== modified file 'openerp/addons/base/module/module.py'
--- openerp/addons/base/module/module.py	2012-02-02 09:26:34 +0000
+++ openerp/addons/base/module/module.py	2012-02-08 22:20:24 +0000
@@ -30,6 +30,7 @@
 import zipfile
 import zipimport
 
+import openerp
 import openerp.modules as addons
 import pooler
 import release
@@ -344,6 +345,7 @@
         if to_install_ids:
             self.button_install(cr, uid, to_install_ids, context=context)
 
+        openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
         return dict(ACTION_DICT, name=_('Install'))
 
     def button_immediate_install(self, cr, uid, ids, context=None):

=== modified file 'openerp/modules/registry.py'
--- openerp/modules/registry.py	2012-01-24 12:42:52 +0000
+++ openerp/modules/registry.py	2012-02-08 22:20:24 +0000
@@ -50,6 +50,9 @@
         self._init_parent = {}
         self.db_name = db_name
         self.db = openerp.sql_db.db_connect(db_name)
+        # Flag indicating if at least one model cache has been cleared.
+        # Useful only in a multi-process context.
+        self._any_cache_cleared = False
 
         cr = self.db.cursor()
         has_unaccent = openerp.modules.db.has_unaccent(cr)
@@ -115,6 +118,14 @@
         for model in self.models.itervalues():
             model.clear_caches()
 
+    # Useful only in a multi-process context.
+    def reset_any_cache_cleared(self):
+        self._any_cache_cleared = False
+
+    # Useful only in a multi-process context.
+    def any_cache_cleared(self):
+        return self._any_cache_cleared
+
 class RegistryManager(object):
     """ Model registries manager.
 
@@ -127,6 +138,15 @@
     registries = {}
     registries_lock = threading.RLock()
 
+    # Inter-process signaling (used only when openerp.multi_process is True):
+    # The `base_registry_signaling` sequence indicates the whole registry
+    # must be reloaded.
+    # The `base_cache_signaling sequence` indicates all caches must be
+    # invalidated (i.e. cleared).
+    # TODO per registry
+    base_registry_signaling_sequence = 1
+    base_cache_signaling_sequence = 1
+
     @classmethod
     def get(cls, db_name, force_demo=False, status=None, update_module=False,
             pooljobs=True):
@@ -215,5 +235,64 @@
             if db_name in cls.registries:
                 cls.registries[db_name].clear_caches()
 
+    @classmethod
+    def check_registry_signaling(cls, db_name):
+        if openerp.multi_process:
+            # Check if the model registry must be reloaded (e.g. after the
+            # database has been updated by another process).
+            cr = openerp.sql_db.db_connect(db_name).cursor()
+            registry_reloaded = False
+            try:
+                cr.execute('SELECT last_value FROM base_registry_signaling')
+                r = cr.fetchone()[0]
+                if cls.base_registry_signaling_sequence != r:
+                    _logger.info("Reloading the model registry after database signaling.")
+                    cls.base_registry_signaling_sequence = r
+                    # Don't run the cron in the Gunicorn worker.
+                    cls.new(db_name, pooljobs=False)
+                    registry_reloaded = True
+            finally:
+                cr.close()
+
+            # Check if the model caches must be invalidated (e.g. after a write
+            # occured on another process). Don't clear right after a registry
+            # has been reload.
+            cr = openerp.sql_db.db_connect(db_name).cursor()
+            try:
+                cr.execute('SELECT last_value FROM base_cache_signaling')
+                r = cr.fetchone()[0]
+                if cls.base_cache_signaling_sequence != r and not registry_reloaded:
+                    _logger.info("Invalidating all model caches after database signaling.")
+                    cls.base_cache_signaling_sequence = r
+                    registry = cls.get(db_name, pooljobs=False)
+                    registry.clear_caches()
+            finally:
+                cr.close()
+
+    @classmethod
+    def signal_caches_change(cls, db_name):
+        if openerp.multi_process:
+            # Check the registries if any cache has been cleared and signal it
+            # through the database to other processes.
+            registry = cls.get(db_name, pooljobs=False)
+            if registry.any_cache_cleared():
+                _logger.info("At least one model cache has been cleare, signaling through the database.")
+                cr = openerp.sql_db.db_connect(db_name).cursor()
+                try:
+                    pass
+                    # cr.execute("select nextval('base_cache_signaling')")
+                    # cls.base_cache_signaling_sequence to = result
+                finally:
+                    cr.close()
+                registry.reset_any_cache_cleared()
+
+    @classmethod
+    def signal_registry_change(cls, db_name):
+        cr = openerp.sql_db.db_connect(db_name).cursor()
+        try:
+            cr.execute("select nextval('base_registry_signaling')")
+        finally:
+            cr.close()
+        #cls.base_registry_signaling_sequence to = result
 
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

=== modified file 'openerp/osv/orm.py'
--- openerp/osv/orm.py	2012-02-02 09:26:34 +0000
+++ openerp/osv/orm.py	2012-02-08 22:20:24 +0000
@@ -2384,6 +2384,7 @@
         try:
             getattr(self, '_ormcache')
             self._ormcache = {}
+            self.pool._any_cache_cleared = True
         except AttributeError:
             pass
 

=== modified file 'openerp/service/__init__.py'
--- openerp/service/__init__.py	2012-01-24 15:07:50 +0000
+++ openerp/service/__init__.py	2012-02-08 22:20:24 +0000
@@ -65,7 +65,7 @@
     netrpc_server.init_servers()
 
     # Start the main cron thread.
-    openerp.cron.start_master_thread()
+    #openerp.cron.start_master_thread()
 
     # Start the top-level servers threads (normally HTTP, HTTPS, and NETRPC).
     openerp.netsvc.Server.startAll()

=== modified file 'openerp/service/web_services.py'
--- openerp/service/web_services.py	2012-02-08 10:59:13 +0000
+++ openerp/service/web_services.py	2012-02-08 22:20:24 +0000
@@ -568,8 +568,10 @@
             raise NameError("Method not available %s" % method)
         security.check(db,uid,passwd)
         assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
+        openerp.modules.registry.RegistryManager.check_registry_signaling(db)
         fn = getattr(openerp.osv.osv.service, method)
         res = fn(db, uid, *params)
+        openerp.modules.registry.RegistryManager.signal_caches_change(db)
         return res
 
 
@@ -649,8 +651,10 @@
         if method not in ['report', 'report_get', 'render_report']:
             raise KeyError("Method not supported %s" % method)
         security.check(db,uid,passwd)
+        openerp.modules.registry.RegistryManager.check_registry_signaling(db)
         fn = getattr(self, 'exp_' + method)
         res = fn(db, uid, *params)
+        openerp.modules.registry.RegistryManager.signal_caches_change(db)
         return res
 
     def exp_render_report(self, db, uid, object, ids, datas=None, context=None):

=== modified file 'openerp/tools/cache.py'
--- openerp/tools/cache.py	2011-11-22 08:58:48 +0000
+++ openerp/tools/cache.py	2012-02-08 22:20:24 +0000
@@ -57,10 +57,12 @@
             try:
                 key = args[self.skiparg-2:]
                 del d[key]
+                self2.pool._any_cache_cleared = True
             except KeyError:
                 pass
         else:
             d.clear()
+            self2.pool._any_cache_cleared = True
 
 class ormcache_multi(ormcache):
     def __init__(self, skiparg=2, size=8192, multi=3):

=== modified file 'openerp/wsgi.py'
--- openerp/wsgi.py	2012-01-24 15:14:51 +0000
+++ openerp/wsgi.py	2012-02-08 22:20:24 +0000
@@ -438,7 +438,7 @@
 
     The WSGI server can be shutdown with stop_server() below.
     """
-    threading.Thread(target=openerp.wsgi.serve).start()
+    threading.Thread(name='WSGI server', target=openerp.wsgi.serve).start()
 
 def stop_server():
     """ Initiate the shutdown of the WSGI server.
@@ -456,7 +456,7 @@
 def on_starting(server):
     global arbiter_pid
     arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
-    #openerp.tools.cache = kill_workers_cache
+    openerp.multi_process = True # Yay!
     openerp.netsvc.init_logger()
     openerp.osv.osv.start_object_proxy()
     openerp.service.web_services.start_web_services()
@@ -464,11 +464,11 @@
     openerp.modules.loading.open_openerp_namespace()
     for m in openerp.conf.server_wide_modules:
         try:
-            __import__(m)
+            __import__('openerp.addons.' + m)
             # Call any post_load hook.
             info = openerp.modules.module.load_information_from_description_file(m)
             if info['post_load']:
-                getattr(sys.modules[m], info['post_load'])()
+                getattr(sys.modules['openerp.addons.' + m], info['post_load'])()
         except Exception:
             msg = ''
             if m == 'web':
@@ -477,11 +477,6 @@
 Maybe you forgot to add those addons in your addons_path configuration."""
             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
 
-# Install our own signal handler on the master process.
-def when_ready(server):
-    # Hijack gunicorn's SIGWINCH handling; we can choose another one.
-    signal.signal(signal.SIGWINCH, make_winch_handler(server))
-
 # Install limits on virtual memory and CPU time consumption.
 def pre_request(worker, req):
     import os
@@ -509,30 +504,9 @@
             'too high, rebooting the worker.')
         worker.alive = False # Commit suicide after the request.
 
-# Our signal handler will signal a SGIQUIT to all workers.
-def make_winch_handler(server):
-    def handle_winch(sig, fram):
-        server.kill_workers(signal.SIGQUIT) # This is gunicorn specific.
-    return handle_winch
-
 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
 def time_expired(n, stack):
     _logger.info('CPU time limit exceeded.')
     raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception
 
-# Kill gracefuly the workers (e.g. because we want to clear their cache).
-# This is done by signaling a SIGWINCH to the master process, so it can be
-# called by the workers themselves.
-def kill_workers():
-    try:
-        os.kill(arbiter_pid, signal.SIGWINCH)
-    except OSError, e:
-        if e.errno == errno.ESRCH: # no such pid
-            return
-        raise
-
-class kill_workers_cache(openerp.tools.ormcache):
-    def clear(self, dbname, *args, **kwargs):
-        kill_workers()
-
 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:

_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help   : https://help.launchpad.net/ListHelp

Reply via email to