David Abrahams wrote: > on Fri May 30 2008, David Abrahams <dave-AT-boostpro.com> wrote: > >>> I have actually been thinking about making Trac use a single global db >>> pool instead of one pool for each environment in each process. >> Yep, that's what we need, or something very like it. Of course, each of >> my environments connects with a different schema and role, so >> connections aren't interchangeable. I could think about changing that, >> but I'm thinking the short-term fix is to build an LRU cache of all the >> connections in any pool, and simply throw out the oldest ones when the >> cache is full and a new one is requested. > > Actually, your approach turned out to be easier. My patched pool.py > follows
Hi David, When I tested your patch I noticed that the connections returned by the pool sometimes didn't correspond to the correct database. I think it's due to the dormant dictionary key is (thread_id, connector) which is not enough now when the pool is process wide. Anyway, I've attached my attempt at a process-wide LRU based connection pool. It's still a work in progress but it would be very interesting to know if and how well it works in your setup. I also added some code to close all connections that are unused for more than 60 seconds. One thing I still would like to add is making it possible to share connections between Trac environments using the same database (and role) but different schemas. But I think that will require some changes to other files besides pool.py. / Jonas --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Trac Development" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/trac-dev?hl=en -~----------~----~----~----~------~----~------~--~---
# -*- coding: utf-8 -*- # # Copyright (C) 2005 Edgewall Software # Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]> # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at http://trac.edgewall.org/wiki/TracLicense. # # This software consists of voluntary contributions made by many # individuals. For the exact contribution history, see the revision # history and logs, available at http://trac.edgewall.org/log/. # # Author: Christopher Lenz <[EMAIL PROTECTED]> try: import threading except ImportError: import dummy_threading as threading threading._get_ident = lambda: 0 import time from trac.db.util import ConnectionWrapper from trac.util.translation import _ class TimeoutError(Exception): """Exception raised by the connection pool when no connection has become available after a given timeout.""" class PooledConnection(ConnectionWrapper): """A database connection that can be pooled. When closed, it gets returned to the pool. """ def __init__(self, pool, cnx, key, tid): ConnectionWrapper.__init__(self, cnx) self._pool = pool self._key = key self._tid = tid def close(self): if self.cnx: self._pool._return_cnx(self.cnx, self._key, self._tid) self.cnx = None def __del__(self): self.close() def try_rollback(cnx): """Resets the Connection in a safe way, returning True when it succeeds. The rollback we do for safety on a Connection can fail at critical times because of a timeout on the Connection. """ try: cnx.rollback() # resets the connection return True except Exception: cnx.close() return False class ConnectionPoolBackend(object): """A process-wide LRU-based connection pool. """ def __init__(self, maxsize): self._available = threading.Condition(threading.RLock()) self._maxsize = maxsize self._active = {} self._pool = [] self._pool_key = [] self._pool_time = [] def get_cnx(self, connector, kwargs, timeout=None): num = 1 cnx = None key = unicode(kwargs) start = time.time() tid = threading._get_ident() self._available.acquire() while True: try: # First choice: Return the same cnx already used by the thread if (tid, key) in self._active: cnx, num = self._active[(tid, key)] num += 1 # Second best option: Reuse a live pooled connection elif key in self._pool_key: idx = self._pool_key.index(key) self._pool_key.pop(idx) self._pool_time.pop(idx) cnx = self._pool.pop(idx) # Third best option: Create a new connection elif len(self._active) + len(self._pool) < self._maxsize: cnx = connector.get_connection(**kwargs) # Forth best option: Replace a pooled connection with a new one elif len(self._active) < self._maxsize: # Remove the LRU connection in the pool self._pool.pop(0).close() self._pool_key.pop(0) self._pool_time.pop(0) cnx = connector.get_connection(**kwargs) if cnx: self._active[(tid, key)] = (cnx, num) return PooledConnection(self, cnx, key, tid) # Worst option: wait until a connection pool slot is available if timeout and (time.time() - start) > timeout: raise TimeoutError(_('Unable to get database ' 'connection within %(time)d ' 'seconds', time=timeout)) elif timeout: self._available.wait(timeout) else: self._available.wait() finally: self._available.release() def _return_cnx(self, cnx, key, tid): self._available.acquire() try: assert (tid, key) in self._active cnx, num = self._active[(tid, key)] if num == 1 and cnx.poolable and try_rollback(cnx): del self._active[(tid, key)] self._pool.append(cnx) self._pool_key.append(key) self._pool_time.append(time.time()) elif num == 1: del self._active[(tid, key)] else: self._active[(tid, key)] = (cnx, num - 1) finally: self._available.release() def shutdown(self, tid=None): """Close pooled connections not used in a while""" when = time.time() - 60 self._available.acquire() try: while self._pool_time and self._pool_time[0] < when: self._pool.pop(0) self._pool_key.pop(0) self._pool_time.pop(0) finally: self._available.release() backend = ConnectionPoolBackend(5) class ConnectionPool(object): def __init__(self, maxsize, connector, **kwargs): self._maxsize = maxsize self._connector = connector self._kwargs = kwargs def get_cnx(self, timeout=None): return backend.get_cnx(self._connector, self._kwargs, timeout) def shutdown(self, tid=None): backend.shutdown(tid)
