Hi Michael et al,
I am banging my ahead into a (so it seems) trivial problem for days now.
Basically, I finally need to lock my SQLite database because multiple
threads are writing to it. This does not happen to often, but I have a
single thread that is dumping thousands of records into the database
while the user interface might concurrently do simple updates like
"update component set short_description='Foo' where id=10".
Originally the sync thread was slow enough that the SQLite side of
locking did work. Now that it is optimized a bit it does not keep up
anymore. This leads to all other requests going the "Database is locked"
way of failing.
99% of the time all db access is sequential so I figured it would
suffice to lock the database before I do anything to it and unlock the
database when done. I tried to instrument all code but with the unit of
work pattern it is hard to find out where the lock was forgotten...
So my current approach is to use the before_execute event to open a lock
and when a connection is returned to the pool I unlock it. I attached
the code of that mechanism.
My first tests with the SQLAlchemy core where promising, but when using
the ORM I get a bunch of deadlocks where it seems like the session opens
two connections A and B where A locks B out. I can provide more data and
example code, but I would first like to know if my approach is
completely bogus in your eyes.
If it is I am open to better ideas.
BTW: Originally I captured before_flush and commit/rollback session
events, but this still created locking errors due to read requests going
unchecked.
Greetings, Torsten
--
DYNAmore Gesellschaft fuer Ingenieurdienstleistungen mbH
Torsten Landschoff
Office Dresden
Tel: +49-(0)351-4519587
Fax: +49-(0)351-4519561
mailto:[email protected]
http://www.dynamore.de
Registration court: Mannheim, HRB: 109659, based in Karlsruhe,
Managing director: Prof. Dr. K. Schweizerhof, Dipl.-Math. U. Franz
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en.
# -*- coding: utf-8 -*-
import collections
import threading
import traceback
from sqlalchemy import event
from sqlalchemy.engine import Connection
class DatabaseAutoLocker(object):
"""
Verwaltet eine Zugriffssperre auf eine Datenbank, die den exklusiven Zugriff
durch einen einzelnen Thread sicherstellt. Die gedachte Verwendung ist, eine
Instanz dieser Klasse nach dem Erstellen der Engine anzulegen::
engine = create_engine("sqlite:///test.db")
DatabaseAutoLocker(engine)
Die DatabaseAutoLocker-Instanz "hängt" sich dann über Events an die engine und
lebt, solange diese engine existiert. Daher braucht man keine Referenz auf
den Locker aufzubewahren.
"""
def __init__(self, engine, timeout=None):
"""
Erstellt einen Autolocker für die in *engine* gegebene Datenbank-Engine.
:param engine: Datenbank-Engine als Instanz von sqlalchemy.engine
:param timeout: Zeit in Sekunden, die ein Client auf die Datenbankverbindung
wartet, bevor eine Exception geworfen wird. None (default) deaktiviert
den Timeout und der Client wartet bis zuletzt.
"""
self.timeout = timeout
#: Schützt den Zugriff auf interne Daten
self._mutex = threading.RLock()
#: Enthält die Liste der noch auf die Datenbanksperre wartenden Clienten
self._pending_requests = collections.deque()
#: Aktuell aktive Verbindung (diese hat die Datenbank für sich gesperrt). None,
#: wenn keine Verbindung die Sperre hat.
self._active_dbapi_connection = None
#: Wenn aktiviert liefert dies den Traceback des Aufrufers, der die Datenbank
#: gegenwärtig gesperrt hält, sonst immer None.
self._active_locker_traceback = None
event.listen(engine, "before_execute", self.__connection_event_before_execute)
event.listen(engine, "checkin", self.__pool_event_checkin)
def __connection_event_before_execute(self, conn, clauseelement, multiparams, params):
"""
Registriert die erste Ausführung eines Kommandos über eine Datenbankverbindung.
Hier muss die Datenbank für andere Verbindung gesperrt werden.
"""
dbapi_connection = _get_dbapi_connection(conn)
request = None
with self._mutex:
if self._active_dbapi_connection is dbapi_connection:
# Nichts zu tun, die Verbindung ist schon im Besitz der Sperre.
return
locker_traceback = None
if self.timeout is not None:
locker_traceback = traceback.format_stack()
if self._active_dbapi_connection is None:
# Keine andere Verbindung aktiv, dann ist dies jetzt die aktive Verbindung
self._active_dbapi_connection = dbapi_connection
self._active_locker_traceback = locker_traceback
else:
# Ansonsten müssen wir einen Antrag stellen
request = _Request(dbapi_connection, locker_traceback)
self._pending_requests.append(request)
if request:
request.wait(self.timeout)
with self._mutex:
if dbapi_connection is self._active_dbapi_connection:
# Wir wurden durch den vorigen Client als neuer Besitzer der Datenbank
# eingetragen.
pass
else:
# Dann war es ein Timeout und wir geben auf. Demnach sollte der Request
# noch in der Liste von wartenden Anfragen sein und wir entfernen uns
# wieder.
self._pending_requests.remove(request)
self.__raise_locked_exception()
def __raise_locked_exception(self):
if self._active_locker_traceback:
raise DatabaseLockedException(
"Database is locked by connection {0!r}, established here:\n{1}."
.format(self._active_dbapi_connection, "".join(self._active_locker_traceback)))
else:
raise DatabaseLockedException(
"Database is locked by connection {0!r}.".format(self._active_dbapi_connection))
def __pool_event_checkin(self, dbapi_connection, connection_record):
"""
Reagiert auf die Rückgabe einer Datenbankverbindung an den Pool. In
diesem Moment ist klar, dass mit der Verbindung nichts mehr gemacht
wird und wir können die Sperre dafür aufheben.
"""
with self._mutex:
if dbapi_connection is self._active_dbapi_connection:
# Der eine Client ist fertig, Datenbank freigeben.
self._active_dbapi_connection = None
# Wenn jemand wartet, diesen gleich benachrichtigen und ihn die Sperre
# übernehmen lassen.
if self._pending_requests:
request = self._pending_requests.popleft()
self._active_dbapi_connection = request.dbapi_connection
self._active_locker_traceback = request.locker_traceback
request.notify()
class _Request(object):
__slots__ = "dbapi_connection", "event", "locker_traceback"
def __init__(self, dbapi_connection, locker_traceback):
self.dbapi_connection = dbapi_connection
self.locker_traceback = locker_traceback
self.event = threading.Event()
def notify(self):
self.event.set()
def wait(self, timeout):
self.event.wait(timeout)
class DatabaseLockedException(Exception):
u"""
Ein anderer Thread hat die Datenbank solange gesperrt gehalten, dass der
Versuch, diese zu sperren, gescheitert ist.
"""
pass
def _get_dbapi_connection(conn):
if not isinstance(conn, Connection):
raise TypeError("{0!r} is not a SQLAlchemy connection.".format(conn))
dbapi_connection = conn.connection.connection
return dbapi_connection