Hi:
SQLA is used in my python project. And in my code, I have the green thread
used. And the green thread was killed in some of my code. After the the
thread was killed, and the transaction was just began. And the database
connection was still in postgresql server side with status "idle in
transaction". This caused my connection was exhausted in the pool.
Traceback (most recent call last): File
"/usr/lib/python2.7/dist-packages/eventlet/hubs/hub.py", line 457, in
fire_timers timer() File
"/usr/lib/python2.7/dist-packages/eventlet/hubs/timer.py", line 58, in
*call* cb(*args, kw) File
"/usr/lib/python2.7/dist-packages/eventlet/greenthread.py", line 214, in
main result = function(*args, kwargs) File "/tmp/test.py", line 48, in
operate_db query = session.query(Test).all() File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2588, in
all return list(self) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in
iter return self._execute_and_instances(context) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in
_execute_and_instances close_with_result=True) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in
_connection_from_session **kw) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 905, in
connection execution_options=execution_options) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 910, in
_connection_for_bind engine, execution_options) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 334, in
_connection_for_bind conn = bind.contextual_connect() File
"/usr/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2039, in
contextual_connect self._wrap_pool_connect(self.pool.connect, None), File
"/usr/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2074, in
_wrap_pool_connect return fn() File
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 376, in connect
return _ConnectionFairy._checkout(self) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 714, in
_checkout fairy = _ConnectionRecord.checkout(pool) File
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 480, in
checkout rec = pool._do_get() File
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1054, in
_do_get (self.size(), self.overflow(), self._timeout)) TimeoutError:
QueuePool limit of size 2 overflow 0 reached, connection timed out, timeout
30
Questions:
1. When will the connection returned to the pool?
In my understanding, when the rollback/commit was listened, the connection
was returned.
2. Will the connection returned to the pool if my green thread was killed?
>From the code, we can see the connection object seems still in the pool
with active status.
Also there is no greenexit exception here , if just kill the green thread
safely.
3. Will the connection returned to the pool if the db transaction(id in
transaction) was rolled back by my
database(idle_in_transaction_session_timeout). If not, how can we handle
the connection whose status is in "idle in transaction"?
I also kill the database transaction process in PG server, but the
connection still not returned to the pool.
The code was updated with Monkey patching.
Thanks.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
import eventlet
eventlet.monkey_patch(psycopg=True, thread=True)
import threading
import eventlet
from eventlet import greenthread
from eventlet import greenpool
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker, mapper
import time
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.dialects').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.orm').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.DEBUG)
import time
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,Table,MetaData,ForeignKey
from sqlalchemy import func,select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.orm import backref
from sqlalchemy.pool import NullPool
from sqlalchemy.pool import QueuePool
import multiprocessing
class Test(object):
def __init__(self, f1, f2, f3):
self.f1 = f1
self.f2 = f2
self.f3 = f3
def get_session(engine):
Session = sessionmaker(engine, autocommit=True)
return Session()
def operate_db(**kwargs):
id = kwargs["id"]
print "in operate_db" + str(id)
session = kwargs["session"]
session.begin()
query = session.query(Test).all()
print engine.pool.status(), "before sleep dump_table1 connection object id ",engine.pool._creator
greenthread.sleep(20)
print engine.pool.status(), "after sleep dump_table1 connection object id ",engine.pool._creator
session.rollback()
print engine.pool.status(), "after commit dump_table1 connection object id ",engine.pool._creator
print "out operate_db" + str(id)
def _thread_done(gt, *args, **kwargs):
pass
engine = create_engine('postgres://xxx:[email protected]:5433/test96',pool_size=2, echo=True,echo_pool='debug',pool_recycle=500,max_overflow=0)
engine.echo= True
metadata = MetaData()
table = Table('test', metadata,
Column('f1', Integer, primary_key=True),
Column('f2', Integer),
Column('f3', Integer)
)
metadata.create_all(engine)
mapper(Test, table)
thread_groups = greenpool.GreenPool(2)
def _thread_done(gt, *args, **kwargs):
pass
tg1 = thread_groups.spawn(operate_db, id = 1, session = get_session(engine))
tg1.link(_thread_done, group=thread_groups, thread=tg1)
tg2 = thread_groups.spawn(operate_db, id = 2, session = get_session(engine))
tg2.link(_thread_done, group=thread_groups, thread=tg2)
greenthread.sleep(10)
# green thread was killed here, and the connection was in "idle in transaction"
greenthread.kill(tg1)
greenthread.kill(tg2)
print "kill success!"
greenthread.sleep(10)
# create a new transaction , which will raise QueuePool limit error
tg3 = thread_groups.spawn(operate_db, id = 3, session = get_session(engine))
tg3.link(_thread_done, group=thread_groups, thread=tg3)
print engine.pool.status(), "in main dump_table1 connection object id ",engine.pool._creator
greenthread.sleep(40)