Hi,
I'm using PostgreSQL advisory locks in a multithreaded program. Worker
threads acquire locks with
session.execute(select([func.pg_advisory_lock(key)])) during a
transaction and release them just after session.commit(). Sometimes
however, the connection behind the thread's session will have changed
after COMMIT, making it impossible for the thread to release the locks
it acquired. Note that I'm using scoped_session()'s class methods
everywhere.
A python script to reproduce the problem and its output are attached
below.
Is there a way to force the session to use the same connection within
a thread ? Would I then have to recycle the connections from time to
time ?
Thanks,
Julien
=== sa_pg_advisory_locks.py ==
# -*- coding: utf8 -*-
import random
import os
import threading
import time
from sqlalchemy import MetaData
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import create_session, scoped_session,
sessionmaker, reconstructor
from sqlalchemy.sql import func, select
sa_engine = create_engine(os.environ['TEST_DSN'])
session = scoped_session(lambda: create_session(sa_engine,
autoflush=True, expire_on_commit=True, autocommit=False))
# Toggle this switch to see the difference in behaviour
COMMIT_BEFORE_LOCK_RELEASE = True
# COMMIT_BEFORE_LOCK_RELEASE = False
print 'Will commit %s releasing advisory lock' % ('before' if
COMMIT_BEFORE_LOCK_RELEASE else 'after')
# Synchronize program termination
event = threading.Event()
# Test function, will run concurrently in two threads
def run_test():
try:
i = 0
while 1:
if event.isSet() or i >= 100:
break
# Show sign of life
if i and (i % 50 == 0):
print i
key = random.randint(1,2**16)
pid, _ = session.execute(select([func.pg_backend_pid(),
func.pg_advisory_lock(key)])).fetchone()
now = session.execute(select([func.now()])).scalar()
if COMMIT_BEFORE_LOCK_RELEASE:
session.commit()
pid_, unlocked =
session.execute(select([func.pg_backend_pid(),
func.pg_advisory_unlock(key)])).fetchone()
if unlocked:
assert pid_ == pid
else:
raise AssertionError('Iteration %i, acquisition
pid %i, release
pid %i\n' % (i, pid, pid_))
if not COMMIT_BEFORE_LOCK_RELEASE:
session.commit()
i += 1
except Exception:
event.set()
raise
event.set()
for i in xrange(10):
thread = threading.Thread(target=run_test)
thread.daemon = True
thread.start()
event.wait()
time.sleep(1)
== output ==
u...@host ~
$ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
Will commit before releasing advisory lock
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 6, acquisition pid 16676, release pid 27340
Exception in thread Thread-5:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 12, acquisition pid 27340, release pid 16676
Exception in thread Thread-7:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 10, acquisition pid 18248, release pid 8452
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in
__bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 18, acquisition pid 8452, release pid 18248
u...@host ~
$ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
Will commit before releasing advisory lock
Exception in thread Thread:Traceback (most recent call last):
File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
self.run()
File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
Exception in thread Thread:Traceback (most recent call last):
File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
self._target(*self._args, **self._kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 2, acquisition pid 18076, release pid 11332
Exception in thread Thread:Traceback (most recent call last):
File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
self.run()
File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
self._target(*self._args, **self._kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
self.run()
File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
self._target(*self._args, **self._kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
Exception in thread Thread:Traceback (most recent call last):
File "C:\cygwin\home\user\jython\Lib\threading.py", line 179, in
_Thread__bootstrap
self.run()
File "C:\cygwin\home\user\jython\Lib\threading.py", line 170, in run
self._target(*self._args, **self._kwargs)
File "sa_pg_advisory_locks.py", line 44, in run_test
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 10, acquisition pid 11332, release pid 22572
AssertionError: Iteration 5, acquisition pid 22572, release pid 14376
raise AssertionError('Iteration %i, acquisition pid %i, release
pid %i\n' % (i, pid, pid_))
AssertionError: Iteration 5, acquisition pid 14376, release pid 18076
u...@host ~
$ TEST_DSN=$CPYTHON_DSN python sa_pg_advisory_locks.py
Will commit after releasing advisory lock
50
50
50
50
50
50
50
50
50
50
u...@host ~
$ TEST_DSN=$JYTHON_DSN jython sa_pg_advisory_locks.py
Will commit after releasing advisory lock
50
50
50
50
50
50
50
50
50
50
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en.