on Sun Jun 01 2008, Jonas Borgström <jonas-AT-edgewall.com> wrote:
> David Abrahams wrote: >> on Fri May 30 2008, David Abrahams <dave-AT-boostpro.com> wrote: >> >>>> I have actually been thinking about making Trac use a single global db >>>> pool instead of one pool for each environment in each process. >>> Yep, that's what we need, or something very like it. Of course, each of >>> my environments connects with a different schema and role, so >>> connections aren't interchangeable. I could think about changing that, >>> but I'm thinking the short-term fix is to build an LRU cache of all the >>> connections in any pool, and simply throw out the oldest ones when the >>> cache is full and a new one is requested. >> >> Actually, your approach turned out to be easier. My patched pool.py >> follows > > Hi David, > > When I tested your patch I noticed that the connections returned by the > pool sometimes didn't correspond to the correct database. In my installation most of the instances are hitting the same database with different Postgresql schemas. The one instance that was using a different DB kept reporting that it needed an upgrade even though trac-admin upgrade refused to do anything, so maybe that accounts for what I'm seeing... although "same DB, different schema" is not really all that different from "different DB". > I think it's due to the dormant dictionary key is (thread_id, > connector) which is not enough now when the pool is process wide. I don't understand why. Doesn't the connector identify the database? I understand that it might fail to pool connections that use the same database because they use different connector instances, but that seemed like at worst a missed optimization opportunity. On the other hand, I have to admit I only vaugely understood the code before; the lack of comments made it harder. Your code doesn't have a dormant dict, I notice. > Anyway, I've attached my attempt at a process-wide LRU based connection > pool. It's still a work in progress but it would be very interesting to > know if and how well it works in your setup. I'll try popping it in tonight and we'll see what happens. > I also added some code to close all connections that are unused for more > than 60 seconds. > > One thing I still would like to add is making it possible to share > connections between Trac environments using the same database (and role) > but different schemas. But I think that will require some changes to > other files besides pool.py. Probably that's true. > # -*- coding: utf-8 -*- > # > # Copyright (C) 2005 Edgewall Software > # Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]> > # All rights reserved. > # > # This software is licensed as described in the file COPYING, which > # you should have received as part of this distribution. The terms > # are also available at http://trac.edgewall.org/wiki/TracLicense. > # > # This software consists of voluntary contributions made by many > # individuals. For the exact contribution history, see the revision > # history and logs, available at http://trac.edgewall.org/log/. > # > # Author: Christopher Lenz <[EMAIL PROTECTED]> > > try: > import threading > except ImportError: > import dummy_threading as threading > threading._get_ident = lambda: 0 > import time > > from trac.db.util import ConnectionWrapper > from trac.util.translation import _ > > > class TimeoutError(Exception): > """Exception raised by the connection pool when no connection has become > available after a given timeout.""" > > > class PooledConnection(ConnectionWrapper): > """A database connection that can be pooled. When closed, it gets returned > to the pool. > """ > > def __init__(self, pool, cnx, key, tid): > ConnectionWrapper.__init__(self, cnx) > self._pool = pool > self._key = key > self._tid = tid > > def close(self): > if self.cnx: > self._pool._return_cnx(self.cnx, self._key, self._tid) > self.cnx = None > > def __del__(self): > self.close() > > > def try_rollback(cnx): > """Resets the Connection in a safe way, returning True when it succeeds. > > The rollback we do for safety on a Connection can fail at > critical times because of a timeout on the Connection. > """ > try: > cnx.rollback() # resets the connection > return True > except Exception: > cnx.close() > return False > > > class ConnectionPoolBackend(object): > """A process-wide LRU-based connection pool. > """ > def __init__(self, maxsize): > self._available = threading.Condition(threading.RLock()) > self._maxsize = maxsize > self._active = {} > self._pool = [] > self._pool_key = [] > self._pool_time = [] > > def get_cnx(self, connector, kwargs, timeout=None): > num = 1 > cnx = None > key = unicode(kwargs) > start = time.time() > tid = threading._get_ident() > self._available.acquire() > while True: > try: > # First choice: Return the same cnx already used by the thread > if (tid, key) in self._active: > cnx, num = self._active[(tid, key)] > num += 1 > # Second best option: Reuse a live pooled connection > elif key in self._pool_key: > idx = self._pool_key.index(key) > self._pool_key.pop(idx) > self._pool_time.pop(idx) > cnx = self._pool.pop(idx) > # Third best option: Create a new connection > elif len(self._active) + len(self._pool) < self._maxsize: > cnx = connector.get_connection(**kwargs) > # Forth best option: Replace a pooled connection with a new > one > elif len(self._active) < self._maxsize: > # Remove the LRU connection in the pool > self._pool.pop(0).close() > self._pool_key.pop(0) > self._pool_time.pop(0) > cnx = connector.get_connection(**kwargs) > if cnx: > self._active[(tid, key)] = (cnx, num) > return PooledConnection(self, cnx, key, tid) > # Worst option: wait until a connection pool slot is available > if timeout and (time.time() - start) > timeout: > raise TimeoutError(_('Unable to get database ' > 'connection within %(time)d ' > 'seconds', time=timeout)) > elif timeout: > self._available.wait(timeout) > else: > self._available.wait() > finally: > self._available.release() > > def _return_cnx(self, cnx, key, tid): > self._available.acquire() > try: > assert (tid, key) in self._active > cnx, num = self._active[(tid, key)] > if num == 1 and cnx.poolable and try_rollback(cnx): > del self._active[(tid, key)] > self._pool.append(cnx) > self._pool_key.append(key) > self._pool_time.append(time.time()) > elif num == 1: > del self._active[(tid, key)] > else: > self._active[(tid, key)] = (cnx, num - 1) > finally: > self._available.release() > > def shutdown(self, tid=None): > """Close pooled connections not used in a while""" > when = time.time() - 60 > self._available.acquire() > try: > while self._pool_time and self._pool_time[0] < when: > self._pool.pop(0) > self._pool_key.pop(0) > self._pool_time.pop(0) > finally: > self._available.release() > > backend = ConnectionPoolBackend(5) > > class ConnectionPool(object): > def __init__(self, maxsize, connector, **kwargs): > self._maxsize = maxsize > self._connector = connector > self._kwargs = kwargs > > def get_cnx(self, timeout=None): > return backend.get_cnx(self._connector, self._kwargs, timeout) > > def shutdown(self, tid=None): > backend.shutdown(tid) > -- Dave Abrahams BoostPro Computing http://www.boostpro.com --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Trac Development" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/trac-dev?hl=en -~----------~----~----~----~------~----~------~--~---
