David Abrahams wrote:
> on Sun Jun 01 2008, David Abrahams <dave-AT-boostpro.com> wrote:
>
>   
>> on Sun Jun 01 2008, Jonas Borgström <jonas-AT-edgewall.com> wrote:
>>
>>     
>>> Anyway, I've attached my attempt at a process-wide LRU based connection 
>>> pool. It's still a work in progress but it would be very interesting to 
>>> know if and how well it works in your setup.
>>>       
>> I'll try popping it in tonight and we'll see what happens.
>>     
>
> So far it looks good; it even fixed the two environments that were
> broken by my patch, so you must've been onto something :-).  Obviously I
> never understood pool.py as well as I'd have liked.  Thanks so much for
> working on this.
>   
Yeah, it's a bit tricky and I ended up rewriting most of it to make sure 
I understood it all correctly.
> Are you going to check it in?  For me it fixes a *massive* problem that
> was bringing all of my server's sites down as often as every 12 hrs.
>
>   
Yeah I think so, since it fixes a few long standing issues especially 
Trac's bad habit of collecting a large number of databases connections 
and never releasing them.
So unless someone objects I'm planning to commit this to trunk. It's a 
bit too dangerous to backport this to 0.11-stable this late in the game. 
But if it works well in trunk I think it would be a good candidate for 
0.11.1.

(I've attached an updated version with some cosmetic changes)

/ Jonas

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups "Trac 
Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/trac-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

# -*- coding: utf-8 -*-
#
# Copyright (C) 2005 Edgewall Software
# Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at http://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at http://trac.edgewall.org/log/.
#
# Author: Christopher Lenz <[EMAIL PROTECTED]>

try:
    import threading
except ImportError:
    import dummy_threading as threading
    threading._get_ident = lambda: 0
import time

from trac.db.util import ConnectionWrapper
from trac.util.translation import _


class TimeoutError(Exception):
    """Exception raised by the connection pool when no connection has become
    available after a given timeout."""


class PooledConnection(ConnectionWrapper):
    """A database connection that can be pooled. When closed, it gets returned
    to the pool.
    """

    def __init__(self, pool, cnx, key, tid):
        ConnectionWrapper.__init__(self, cnx)
        self._pool = pool
        self._key = key
        self._tid = tid

    def close(self):
        if self.cnx:
            self._pool._return_cnx(self.cnx, self._key, self._tid)
            self.cnx = None

    def __del__(self):
        self.close()


def try_rollback(cnx):
    """Resets the Connection in a safe way, returning True when it succeeds.
    
    The rollback we do for safety on a Connection can fail at
    critical times because of a timeout on the Connection.
    """
    try:
        cnx.rollback() # resets the connection
        return True
    except Exception:
        cnx.close()
        return False


class ConnectionPoolBackend(object):
    """A process-wide LRU-based connection pool.
    """
    def __init__(self, maxsize):
        self._available = threading.Condition(threading.RLock())
        self._maxsize = maxsize
        self._active = {}
        self._pool = []
        self._pool_key = []
        self._pool_time = []

    def get_cnx(self, connector, kwargs, timeout=None):
        num = 1
        cnx = None
        key = unicode(kwargs)
        start = time.time()
        tid = threading._get_ident()
        self._available.acquire()
        while True:
            try:
                # First choice: Return the same cnx already used by the thread
                if (tid, key) in self._active:
                    cnx, num = self._active[(tid, key)]
                    num += 1
                # Second best option: Reuse a live pooled connection
                elif key in self._pool_key:
                    idx = self._pool_key.index(key)
                    self._pool_key.pop(idx)
                    self._pool_time.pop(idx)
                    cnx = self._pool.pop(idx)
                # Third best option: Create a new connection
                elif len(self._active) + len(self._pool) < self._maxsize:
                    cnx = connector.get_connection(**kwargs)
                # Forth best option: Replace a pooled connection with a new one
                elif len(self._active)  < self._maxsize:
                    # Remove the LRU connection in the pool
                    self._pool.pop(0).close()
                    self._pool_key.pop(0)
                    self._pool_time.pop(0)
                    cnx = connector.get_connection(**kwargs)
                if cnx:
                    self._active[(tid, key)] = (cnx, num)
                    return PooledConnection(self, cnx, key, tid)
                # Worst option: wait until a connection pool slot is available
                if timeout and (time.time() - start) > timeout:
                    raise TimeoutError(_('Unable to get database '
                                         'connection within %(time)d '
                                         'seconds', time=timeout))
                elif timeout:
                    self._available.wait(timeout)
                else:
                    self._available.wait()
            finally:
                self._available.release()

    def _return_cnx(self, cnx, key, tid):
        self._available.acquire()
        try:
            assert (tid, key) in self._active
            cnx, num = self._active[(tid, key)]
            if num == 1 and cnx.poolable and try_rollback(cnx):
                del self._active[(tid, key)]
                self._pool.append(cnx)
                self._pool_key.append(key)
                self._pool_time.append(time.time())
            elif num == 1:
                del self._active[(tid, key)]
            else:
                self._active[(tid, key)] = (cnx, num - 1)
        finally:
            self._available.release()

    def shutdown(self, tid=None):
        """Close pooled connections not used in a while"""
        when = time.time() - 120
        self._available.acquire()
        try:
            while self._pool_time and self._pool_time[0] < when:
                self._pool.pop(0)
                self._pool_key.pop(0)
                self._pool_time.pop(0)
        finally:
            self._available.release()

_backend = ConnectionPoolBackend(5)


class ConnectionPool(object):
    def __init__(self, maxsize, connector, **kwargs):
        # maxsize not used right now but kept for api compatibility
        self._maxsize = maxsize
        self._connector = connector
        self._kwargs = kwargs

    def get_cnx(self, timeout=None):
        return _backend.get_cnx(self._connector, self._kwargs, timeout)

    def shutdown(self, tid=None):
        _backend.shutdown(tid)

Reply via email to