>> I have actually been thinking about making Trac use a single global db 
>> pool instead of one pool for each environment in each process.
>
> Yep, that's what we need, or something very like it.  Of course, each of
> my environments connects with a different schema and role, so
> connections aren't interchangeable.  I could think about changing that,
> but I'm thinking the short-term fix is to build an LRU cache of all the
> connections in any pool, and simply throw out the oldest ones when the
> cache is full and a new one is requested.

Actually, your approach turned out to be easier.  My patched pool.py
follows


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups "Trac 
Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/trac-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

# -*- coding: utf-8 -*-
#
# Copyright (C) 2005 Edgewall Software
# Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://trac.edgewall.org/log/.
#
# Author: Christopher Lenz <[EMAIL PROTECTED]>

try:
    import threading
except ImportError:
    import dummy_threading as threading
    threading._get_ident = lambda: 0
import time

from trac.db.util import ConnectionWrapper


class TimeoutError(Exception):
    """Exception raised by the connection pool when no connection has become
    available after a given timeout."""


class PooledConnection(ConnectionWrapper):
    """A database connection that can be pooled. When closed, it gets returned
    to the pool.
    """

    def __init__(self, pool, cnx, key):
        # The key will be a threadid,connector pair
        ConnectionWrapper.__init__(self, cnx)
        self._pool = pool
        self._key = key

    def close(self):
        if self.cnx:
            self._pool._return_cnx(self.cnx, self._key)
            self.cnx = None

    def __del__(self):
        self.close()


def try_rollback(cnx):
    """Resets the Connection in a safe way, returning True when it succeeds.
    
    The rollback we do for safety on a Connection can fail at
    critical times because of a timeout on the Connection.
    """
    try:
        cnx.rollback() # resets the connection
        return True
    except Exception:
        cnx.close()
        return False

class ConnectionPool_(object):
    """A very simple connection pool implementation."""

    _instance = None
    _available = threading.Condition(threading.RLock())
    
    @classmethod
    def instance(cls, maxsize):
        cls._available.acquire()
        try:
            if cls._instance is None:
                cls._instance = ConnectionPool_(maxsize)
            return cls._instance
        finally:
            cls._available.release()
            
    def __init__(self, maxsize):
        self._dormant = {} # inactive connections in pool
        self._active = {} # active connections by thread ID
        self._maxsize = maxsize # maximum pool size
        self._cursize = 0 # current pool size, includes active connections

    def get_cnx(self, timeout, connector, kwargs):
        start = time.time()
        self._available.acquire()
        try:
            key = (threading._get_ident(),connector)
            if key in self._active:
                num, cnx = self._active.get(key)
                if num == 0: # was pushed back (see _cleanup)
                    if not try_rollback(cnx):
                        del self._active[key]
                        cnx = None
                if cnx:
                    self._active[key][0] = num + 1
                    return PooledConnection(self, cnx, key)
            while True:
                if self._dormant:
                    if key in self._dormant: # prefer same thread
                        cnx = self._dormant.pop(key)
                    else: # pick a random one
                        cnx = self._dormant.pop(self._dormant.keys()[0])
                    if try_rollback(cnx):
                        break
                    else:
                        self._cursize -= 1
                elif self._maxsize and self._cursize < self._maxsize:
                    cnx = connector.get_connection(**kwargs)
                    self._cursize += 1
                    break
                else:
                    if timeout:
                        if (time.time() - start) >= timeout:
                            raise TimeoutError('Unable to get database '
                                               'connection within %d seconds'
                                                % timeout)
                        self._available.wait(timeout)
                    else: # Warning: without timeout, Trac *might* hang
                        self._available.wait()
            self._active[key] = [1, cnx]
            return PooledConnection(self, cnx, key)
        finally:
            self._available.release()

    def _return_cnx(self, cnx, key):
        self._available.acquire()
        try:
            if key in self._active:
                num, cnx_ = self._active.get(key)
                if cnx is cnx_:
                    if num > 1:
                        self._active[key][0] = num - 1
                    else:
                        self._cleanup(key)
                # otherwise, cnx was already cleaned up during a shutdown(*key),
                # and in the meantime, `key[0]` (a thread id) has been reused (#3504)
        finally:
            self._available.release()

    def _cleanup(self, key):
        """Note: self._available *must* be acquired when calling this one."""
        if key in self._active:
            cnx = self._active.pop(key)[1]
            assert key not in self._dormant # hm, how could that happen?
            if cnx.poolable: # i.e. we can manipulate it from other threads
                if try_rollback(cnx):
                    self._dormant[key] = cnx
                else:
                    self._cursize -= 1
            elif key[0] == threading._get_ident():
                if try_rollback(cnx): # non-poolable but same thread: close
                    cnx.close()
                self._cursize -= 1
            else: # non-poolable, different thread: push it back
                self._active[key] = [0, cnx]
            self._available.notify()

    def shutdown(self, tid, connector):
        self._available.acquire()
        try:
            if tid:
                cleanup_list = [(tid,connector)]
            else:
                cleanup_list = [
                    x for x in self._active.keys()
                    if x[1] == connector ]
                
            for key in cleanup_list:
                self._cleanup(key)

            if not tid:
                for key, cnx in self._dormant.iteritems():
                    if key[1] == connector:
                        cnx.close()
        finally:
            self._available.release()

class ConnectionPool(object):
    def __init__(self, maxsize, connector, **kwargs):
        self._connector = connector
        self._kwargs = kwargs
        self.impl = ConnectionPool_.instance(maxsize)
        
    def get_cnx(self, timeout=None):
        return self.impl.get_cnx(timeout, self._connector, self._kwargs)

    def shutdown(self, tid=None):
        return self.impl.shutdown(tid, self._connector)
            
    
-- 
Dave Abrahams
BoostPro Computing
http://www.boostpro.com

Reply via email to