On 09/28/2016 04:51 PM, Viktor Roytman wrote:
I wrote a script with this sort of logic in order to insert many records
into a PostgreSQL table as they are generated.
|
|#!/usr/bin/env python3
importasyncio
fromconcurrent.futures importProcessPoolExecutoraspool
fromfunctools importpartial
importsqlalchemy assa
fromsqlalchemy.ext.declarative importdeclarative_base
metadata =sa.MetaData(schema='stackoverflow')
Base=declarative_base(metadata=metadata)
classExample(Base):
__tablename__ ='example'
pk =sa.Column(sa.Integer,primary_key=True)
text =sa.Column(sa.Text)
sa.event.listen(Base.metadata,'before_create',
sa.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow'))
engine =sa.create_engine(
'postgresql+psycopg2://postgres:password@localhost:5432/stackoverflow'
)
Base.metadata.create_all(engine)
session =sa.orm.sessionmaker(bind=engine,autocommit=True)()
deftask(value):
engine.dispose()
withsession.begin():
session.add(Example(text=value))
async definfinite_task(loop):
spawn_task =partial(loop.run_in_executor,None,task)
whileTrue:
await asyncio.wait([spawn_task(value)forvalue inrange(10000)])
defmain():
loop =asyncio.get_event_loop()
withpool()asexecutor:
loop.set_default_executor(executor)
asyncio.ensure_future(infinite_task(loop))
loop.run_forever()
loop.close()
if__name__ =='__main__':
main()|
|
This code works just fine, creating a pool of as many processes as I
have CPU cores, and happily chugging along forever. I wanted to see how
threads would compare to processes, but I could not get a working
example. Here are the changes I made:
|
|fromconcurrent.futures importThreadPoolExecutoraspool
session_maker =sa.orm.sessionmaker(bind=engine,autocommit=True)
Session=sa.orm.scoped_session(session_maker)
deftask(value):
engine.dispose()
# create new session per thread
session =Session()
withsession.begin():
session.add(Example(text=value))
# remove session once the work is done
Session.remove()|
|
This version runs for a while before a flood of "too many clients"
exceptions:
|sqlalchemy.exc.OperationalError:(psycopg2.OperationalError)FATAL:sorry,too
many clients already|
What's causing the problem?
it would appear that either the ThreadExecutorPool is starting more
threads than your Postgresql database has available connections, or your
engine.dispose() is leaving PG connections lying open to be garbage
collected. A single Engine should be all that's necessary in a single
process.
--
You received this message because you are subscribed to the Google
Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to [email protected]
<mailto:[email protected]>.
To post to this group, send email to [email protected]
<mailto:[email protected]>.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.