Hi,
I have had an issue with bulk operations arising from the following use
case:
I was looping an iterator of mappings, which are either already in the DB
(inserts) or not (updates). As I did not want to loop over it twice and
wanted to do only on transaction, I used greenlets do split the iterator
and concurrently run `bulk_insert_mappings` and `bulk_update_mappings` on
the same session object. That looks like that:
"""
Insert some greenlet magic into generators
"""
from greenlet import greenlet
def greenletify_gen(mapping):
""" This generator will be passed to bulk operations
Greenlets allow us to get out of the bulk methods
And thus run them concurrently """
if mapping is None:
raise StopIteration()
yield mapping
for mapping in iter(greenlet.getcurrent().parent.switch, None):
yield mapping
"""
Concurrently run bulk operations
"""
from test_model import TestModel
from session_ctx_mgr import session_ctx_mgr
from sqlalchemy.exc import ResourceClosedError
with session_ctx_mgr() as session:
insert_greenlet = greenlet(lambda mapping:
session.bulk_insert_mappings(TestModel, greenletify_gen(mapping)))
update_greenlet = greenlet(lambda mapping:
session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
insert_greenlet.switch({'id': 2, 'value': 2})
update_greenlet.switch({'id': 1, 'value': 2})
insert_greenlet.switch(None)
update_greenlet.switch(None)
However the aforementioned example raises this error:
Traceback (most recent call last):
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 2332, in _bulk_save_mappings
isstates, update_changed_only)
File
"/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py",
line 100, in _bulk_update
if session_transaction.session.connection_callable:
AttributeError: 'NoneType' object has no attribute 'connection_callable'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "drunken-octo-dubstep.py", line 43, in <module>
update_greenlet.switch(None)
File "drunken-octo-dubstep.py", line 37, in <lambda>
update_greenlet = greenlet(lambda mapping:
session.bulk_update_mappings(TestModel, greenletify_gen(mapping)))
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/scoping.py",
line 150, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 2318, in bulk_update_mappings
self._bulk_save_mappings(mapper, mappings, True, False, False, False)
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 2340, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File
"/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py",
line 63, in __exit__
compat.reraise(type_, value, traceback)
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/util/compat.py",
line 182, in reraise
raise value
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 2340, in _bulk_save_mappings
transaction.rollback(_capture_exception=True)
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 408, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File "/mnt/vendor/lib/python3.4/site-packages/sqlalchemy/orm/session.py",
line 223, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
sqlalchemy.exc.ResourceClosedError: This transaction is closed
Now, what's funny is that when you invert the 2 last lines, the exception
disappears (ie, `bulk_update_mappings` ends before `bulk_insert_mappings`).
I managed to generalize the behavior, and it seems it all depends on the
1st mapping of the loop. If it was an insert, `bulk_update_mappings` must
end first. If it was an update, `bulk_insert_mappings` must end first.
The whole source code for the example is available
here: https://github.com/Loamhoof/drunken-octo-dubstep, in the file
`drunken-octo-dubsstep.py`.
Now, I posted here and not as an issue because:
1) I'm not sure there isn't an other, more legit way to do what I want
2) Such a way to use the API should be supported
So, questions related:
1) Is there indeed a better way to run both bulk operations while looping
over an iterator only once?
2) Should this be considered a bug?
Versions used:
SQLAlchemy==1.0.5
greenlet==0.4.7
psycopg2==2.6.1
and Postgres 9.4.1, Python 3.4.3
Thanks for your time!
Regards,
Soeren
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.