Hello!
I have situation:
1) I have PostgreSQL database: max-connection == 5
2) I have one IZopeDatabaseAdapter
2) I create 10 threads and join's (!!!) to main thread
2.1) In each thread i get IZopeConnection from IZopeDatabaseAdapter
2.2) execute simple select-statement call transaction.commit
Result: sometimes (~ 1/10) i get exception connection limit exceeded.
I think: problem is safety stopped threads (and his locals) in memory
along short time = connections also in memory short time and not
destroed and not closed. Of course, i can call 'disconnect' every time,
but it's bad.
May be make connections-pool (only da with threadsafety==2) and after
transaction.commit\abort move connection to connections-pool for use in
other threads?
attach:
1) thread_test.py
2) pgda.py - simple da for PostgreSQL for pyPgSql
Thanks!
# -*- coding:utf-8 -*-
простая реализация DA к PostgreSQL
$Id$
from zope.app.rdb import ZopeDatabaseAdapter, parseDSN
import pyPgSQL.PgSQL as PgSQL
class PgDA(ZopeDatabaseAdapter):
simple pyPgSQL adapter for Zope3
__name__ = __parent__ = None
threadsafety = 1
def _connection_factory(self):
conn_info = parseDSN(self.dsn)
encoding = conn_info.get('client_encoding','UTF-8')
pg_conn = PgSQL.connect(host=conn_info['host'],
database=conn_info['dbname'],
client_encoding=encoding,
user=conn_info['username'],
password=conn_info['password'])
return ProxyConnection(pg_conn)
class ProxyConnection:
прокси-класс для перехвата вызовов метода cursor()
def __init__(self, connection):
self.conn = connection
def __getattr__(self, key):
return getattr(self.conn, key)
def cursor(self):
return ProxyCursor(self.conn.cursor())
class ProxyCursor:
прокси-класс для перехвата вызова метода execute и executemany: исправляет ? на %s ибо того хочет pyPgSQL
# TODO: простой replace - плохо = надо заменить или найти другое решение
def __init__(self, cursor):
self.cursor = cursor
def __getattr__(self, key):
return getattr(self.cursor, key)
def execute(self, sql, params=None):
sql = sql.replace('?', '%s')
if params:
return self.cursor.execute(sql, params)
else:
return self.cursor.execute(sql)
def executemany(self, sql, params):
sql = sql.replace('?', '%s')
#return self.cursor.executemany(sql, params)
total_sql = ''
for param in params:
new_param = tuple(map(_quote,param))
total_sql += (sql % new_param) + ';'
self.cursor.execute(total_sql)
import string
import types
def _quote(x):
copy/paste/modified from PyGresSQL
if isinstance(x, unicode):
x = x.encode( 'utf-8' )
if isinstance(x, types.StringType):
x = ' + string.replace(
string.replace(str(x), '\\', ''), ', '') + '
elif isinstance(x, (types.IntType, types.LongType, types.FloatType)):
pass
elif x is None:
x = 'NULL'
elif isinstance(x, (types.ListType, types.TupleType)):
x = '(%s)' % string.join(map(lambda x: str(_quote(x)), x), ',')
elif hasattr(x, '__pg_repr__'):
x = x.__pg_repr__()
else:
raise InterfaceError, 'do not know how to handle type %s' % type(x)
return x
#! /usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import transaction
from zope.app import zapi
from zope.app.testing import ztapi
from zope.app.rdb.interfaces import IZopeDatabaseAdapter
from pgda import PgDA
def f(num):
print 'start thread ', num
zda = zapi.getUtility(IZopeDatabaseAdapter)
conn = zda()
sql = 'select count(*) from pg_user;'
cur = conn.cursor()
cur.execute(sql)
transaction.commit()
print 'end thread ', num
def setupPgDA():
da = PgDA('dbi://test:[EMAIL PROTECTED]/testdb')
ztapi.provideUtility(IZopeDatabaseAdapter, da)
def start():
for i in range(7):
t = threading.Thread(target=f,args=(i+1,))
t.start()
t.join()
if __name__ == '__main__':
setupPgDA()
for i in range(2):
start()
___
Zope3-dev mailing list
Zope3-dev@zope.org
Unsub: http://mail.zope.org/mailman/options/zope3-dev/archive%40mail-archive.com