>> I have actually been thinking about making Trac use a single global db >> pool instead of one pool for each environment in each process. > > Yep, that's what we need, or something very like it. Of course, each of > my environments connects with a different schema and role, so > connections aren't interchangeable. I could think about changing that, > but I'm thinking the short-term fix is to build an LRU cache of all the > connections in any pool, and simply throw out the oldest ones when the > cache is full and a new one is requested.
Actually, your approach turned out to be easier. My patched pool.py follows --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Trac Development" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/trac-dev?hl=en -~----------~----~----~----~------~----~------~--~---
# -*- coding: utf-8 -*- # # Copyright (C) 2005 Edgewall Software # Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at http://trac.edgewall.org/log/. # # Author: Christopher Lenz <[EMAIL PROTECTED]> try: import threading except ImportError: import dummy_threading as threading threading._get_ident = lambda: 0 import time from trac.db.util import ConnectionWrapper class TimeoutError(Exception): """Exception raised by the connection pool when no connection has become available after a given timeout.""" class PooledConnection(ConnectionWrapper): """A database connection that can be pooled. When closed, it gets returned to the pool. """ def __init__(self, pool, cnx, key): # The key will be a threadid,connector pair ConnectionWrapper.__init__(self, cnx) self._pool = pool self._key = key def close(self): if self.cnx: self._pool._return_cnx(self.cnx, self._key) self.cnx = None def __del__(self): self.close() def try_rollback(cnx): """Resets the Connection in a safe way, returning True when it succeeds. The rollback we do for safety on a Connection can fail at critical times because of a timeout on the Connection. """ try: cnx.rollback() # resets the connection return True except Exception: cnx.close() return False class ConnectionPool_(object): """A very simple connection pool implementation.""" _instance = None _available = threading.Condition(threading.RLock()) @classmethod def instance(cls, maxsize): cls._available.acquire() try: if cls._instance is None: cls._instance = ConnectionPool_(maxsize) return cls._instance finally: cls._available.release() def __init__(self, maxsize): self._dormant = {} # inactive connections in pool self._active = {} # active connections by thread ID self._maxsize = maxsize # maximum pool size self._cursize = 0 # current pool size, includes active connections def get_cnx(self, timeout, connector, kwargs): start = time.time() self._available.acquire() try: key = (threading._get_ident(),connector) if key in self._active: num, cnx = self._active.get(key) if num == 0: # was pushed back (see _cleanup) if not try_rollback(cnx): del self._active[key] cnx = None if cnx: self._active[key][0] = num + 1 return PooledConnection(self, cnx, key) while True: if self._dormant: if key in self._dormant: # prefer same thread cnx = self._dormant.pop(key) else: # pick a random one cnx = self._dormant.pop(self._dormant.keys()[0]) if try_rollback(cnx): break else: self._cursize -= 1 elif self._maxsize and self._cursize < self._maxsize: cnx = connector.get_connection(**kwargs) self._cursize += 1 break else: if timeout: if (time.time() - start) >= timeout: raise TimeoutError('Unable to get database ' 'connection within %d seconds' % timeout) self._available.wait(timeout) else: # Warning: without timeout, Trac *might* hang self._available.wait() self._active[key] = [1, cnx] return PooledConnection(self, cnx, key) finally: self._available.release() def _return_cnx(self, cnx, key): self._available.acquire() try: if key in self._active: num, cnx_ = self._active.get(key) if cnx is cnx_: if num > 1: self._active[key][0] = num - 1 else: self._cleanup(key) # otherwise, cnx was already cleaned up during a shutdown(*key), # and in the meantime, `key[0]` (a thread id) has been reused (#3504) finally: self._available.release() def _cleanup(self, key): """Note: self._available *must* be acquired when calling this one.""" if key in self._active: cnx = self._active.pop(key)[1] assert key not in self._dormant # hm, how could that happen? if cnx.poolable: # i.e. we can manipulate it from other threads if try_rollback(cnx): self._dormant[key] = cnx else: self._cursize -= 1 elif key[0] == threading._get_ident(): if try_rollback(cnx): # non-poolable but same thread: close cnx.close() self._cursize -= 1 else: # non-poolable, different thread: push it back self._active[key] = [0, cnx] self._available.notify() def shutdown(self, tid, connector): self._available.acquire() try: if tid: cleanup_list = [(tid,connector)] else: cleanup_list = [ x for x in self._active.keys() if x[1] == connector ] for key in cleanup_list: self._cleanup(key) if not tid: for key, cnx in self._dormant.iteritems(): if key[1] == connector: cnx.close() finally: self._available.release() class ConnectionPool(object): def __init__(self, maxsize, connector, **kwargs): self._connector = connector self._kwargs = kwargs self.impl = ConnectionPool_.instance(maxsize) def get_cnx(self, timeout=None): return self.impl.get_cnx(timeout, self._connector, self._kwargs) def shutdown(self, tid=None): return self.impl.shutdown(tid, self._connector)
-- Dave Abrahams BoostPro Computing http://www.boostpro.com
