on Sun Jun 01 2008, Jonas Borgström <jonas-AT-edgewall.com> wrote:

> David Abrahams wrote:
>> on Fri May 30 2008, David Abrahams <dave-AT-boostpro.com> wrote: 
>> 
>>>> I have actually been thinking about making Trac use a single global db 
>>>> pool instead of one pool for each environment in each process.
>>> Yep, that's what we need, or something very like it.  Of course, each of
>>> my environments connects with a different schema and role, so
>>> connections aren't interchangeable.  I could think about changing that,
>>> but I'm thinking the short-term fix is to build an LRU cache of all the
>>> connections in any pool, and simply throw out the oldest ones when the
>>> cache is full and a new one is requested.
>> 
>> Actually, your approach turned out to be easier.  My patched pool.py
>> follows
>
> Hi David,
>
> When I tested your patch I noticed that the connections returned by the 
> pool sometimes didn't correspond to the correct database. 

In my installation most of the instances are hitting the same database
with different Postgresql schemas.  The one instance that was using a
different DB kept reporting that it needed an upgrade even though
trac-admin upgrade refused to do anything, so maybe that accounts for
what I'm seeing... although "same DB, different schema" is not really
all that different from "different DB".

> I think it's due to the dormant dictionary key is (thread_id,
> connector) which is not enough now when the pool is process wide.

I don't understand why.  Doesn't the connector identify the database?  I
understand that it might fail to pool connections that use the same
database because they use different connector instances, but that seemed
like at worst a missed optimization opportunity.

On the other hand, I have to admit I only vaugely understood the code
before; the lack of comments made it harder.  Your code doesn't have a
dormant dict, I notice.

> Anyway, I've attached my attempt at a process-wide LRU based connection 
> pool. It's still a work in progress but it would be very interesting to 
> know if and how well it works in your setup.

I'll try popping it in tonight and we'll see what happens.

> I also added some code to close all connections that are unused for more 
> than 60 seconds.
>
> One thing I still would like to add is making it possible to share 
> connections between Trac environments using the same database (and role) 
> but different schemas. But I think that will require some changes to 
> other files besides pool.py.

Probably that's true.
> # -*- coding: utf-8 -*-
> #
> # Copyright (C) 2005 Edgewall Software
> # Copyright (C) 2005 Christopher Lenz <[EMAIL PROTECTED]>
> # All rights reserved.
> #
> # This software is licensed as described in the file COPYING, which
> # you should have received as part of this distribution. The terms
> # are also available at http://trac.edgewall.org/wiki/TracLicense.
> #
> # This software consists of voluntary contributions made by many
> # individuals. For the exact contribution history, see the revision
> # history and logs, available at http://trac.edgewall.org/log/.
> #
> # Author: Christopher Lenz <[EMAIL PROTECTED]>
>
> try:
>     import threading
> except ImportError:
>     import dummy_threading as threading
>     threading._get_ident = lambda: 0
> import time
>
> from trac.db.util import ConnectionWrapper
> from trac.util.translation import _
>
>
> class TimeoutError(Exception):
>     """Exception raised by the connection pool when no connection has become
>     available after a given timeout."""
>
>
> class PooledConnection(ConnectionWrapper):
>     """A database connection that can be pooled. When closed, it gets returned
>     to the pool.
>     """
>
>     def __init__(self, pool, cnx, key, tid):
>         ConnectionWrapper.__init__(self, cnx)
>         self._pool = pool
>         self._key = key
>         self._tid = tid
>
>     def close(self):
>         if self.cnx:
>             self._pool._return_cnx(self.cnx, self._key, self._tid)
>             self.cnx = None
>
>     def __del__(self):
>         self.close()
>
>
> def try_rollback(cnx):
>     """Resets the Connection in a safe way, returning True when it succeeds.
>     
>     The rollback we do for safety on a Connection can fail at
>     critical times because of a timeout on the Connection.
>     """
>     try:
>         cnx.rollback() # resets the connection
>         return True
>     except Exception:
>         cnx.close()
>         return False
>
>
> class ConnectionPoolBackend(object):
>     """A process-wide LRU-based connection pool.
>     """
>     def __init__(self, maxsize):
>         self._available = threading.Condition(threading.RLock())
>         self._maxsize = maxsize
>         self._active = {}
>         self._pool = []
>         self._pool_key = []
>         self._pool_time = []
>
>     def get_cnx(self, connector, kwargs, timeout=None):
>         num = 1
>         cnx = None
>         key = unicode(kwargs)
>         start = time.time()
>         tid = threading._get_ident()
>         self._available.acquire()
>         while True:
>             try:
>                 # First choice: Return the same cnx already used by the thread
>                 if (tid, key) in self._active:
>                     cnx, num = self._active[(tid, key)]
>                     num += 1
>                 # Second best option: Reuse a live pooled connection
>                 elif key in self._pool_key:
>                     idx = self._pool_key.index(key)
>                     self._pool_key.pop(idx)
>                     self._pool_time.pop(idx)
>                     cnx = self._pool.pop(idx)
>                 # Third best option: Create a new connection
>                 elif len(self._active) + len(self._pool) < self._maxsize:
>                     cnx = connector.get_connection(**kwargs)
>                 # Forth best option: Replace a pooled connection with a new 
> one
>                 elif len(self._active)  < self._maxsize:
>                     # Remove the LRU connection in the pool
>                     self._pool.pop(0).close()
>                     self._pool_key.pop(0)
>                     self._pool_time.pop(0)
>                     cnx = connector.get_connection(**kwargs)
>                 if cnx:
>                     self._active[(tid, key)] = (cnx, num)
>                     return PooledConnection(self, cnx, key, tid)
>                 # Worst option: wait until a connection pool slot is available
>                 if timeout and (time.time() - start) > timeout:
>                     raise TimeoutError(_('Unable to get database '
>                                          'connection within %(time)d '
>                                          'seconds', time=timeout))
>                 elif timeout:
>                     self._available.wait(timeout)
>                 else:
>                     self._available.wait()
>             finally:
>                 self._available.release()
>
>     def _return_cnx(self, cnx, key, tid):
>         self._available.acquire()
>         try:
>             assert (tid, key) in self._active
>             cnx, num = self._active[(tid, key)]
>             if num == 1 and cnx.poolable and try_rollback(cnx):
>                 del self._active[(tid, key)]
>                 self._pool.append(cnx)
>                 self._pool_key.append(key)
>                 self._pool_time.append(time.time())
>             elif num == 1:
>                 del self._active[(tid, key)]
>             else:
>                 self._active[(tid, key)] = (cnx, num - 1)
>         finally:
>             self._available.release()
>
>     def shutdown(self, tid=None):
>         """Close pooled connections not used in a while"""
>         when = time.time() - 60
>         self._available.acquire()
>         try:
>             while self._pool_time and self._pool_time[0] < when:
>                 self._pool.pop(0)
>                 self._pool_key.pop(0)
>                 self._pool_time.pop(0)
>         finally:
>             self._available.release()
>
> backend = ConnectionPoolBackend(5)
>
> class ConnectionPool(object):
>     def __init__(self, maxsize, connector, **kwargs):
>         self._maxsize = maxsize
>         self._connector = connector
>         self._kwargs = kwargs
>
>     def get_cnx(self, timeout=None):
>         return backend.get_cnx(self._connector, self._kwargs, timeout)
>
>     def shutdown(self, tid=None):
>         backend.shutdown(tid)
>

-- 
Dave Abrahams
BoostPro Computing
http://www.boostpro.com


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups "Trac 
Development" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/trac-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to