Re: [sqlalchemy] pass additional kwargs to execute into BulkUD _do_exec method
On Thu, Feb 22, 2018 at 6:28 AM, Антонио Антуанwrote: > Hello Mike. > Finally got to this. Works perfectly with synchronize_session equals False > or 'evaluate'. > > Got a problem when synchronize_session='fetch'. In that case used > BulkFetch._do_pre_synchronize (as you know :)). There is query instance with > set `_bind_id` attribute, but it don't passed through `execute`. > > Here is an example that can illustrate that problem: > https://gist.github.com/aCLr/a992ca92138aeee86bf9432693be6d6c > It contains Query and Session implementations, which used in my project: > MultiBoundQuery and MultiBoundSession. > Also, it contains `delete` and `update` execution and checking for update > and delete operation. Tracebacks included. continuing this at https://bitbucket.org/zzzeek/sqlalchemy/issues/4196/support-sharded-query-for-bulk-update , the gerrit needs to have tests for these use cases. > > > > вт, 13 февр. 2018 г. в 20:37, Антонио Антуан : >> >> I understand. Excuse me, drown in work. Going to test your code in a >> couple of days. >> >> >> вт, 13 февр. 2018 г., 20:32 Mike Bayer : >>> >>> i don't plan to move on this until I get feedback. >>> >>> On Tue, Feb 6, 2018 at 7:32 PM, Mike Bayer >>> wrote: >>> > code review at >>> > https://gerrit.sqlalchemy.org/#/c/zzzeek/sqlalchemy/+/656 >>> > and ideally would include basic ShardedQuery support >>> > >>> > On Tue, Feb 6, 2018 at 7:23 PM, Mike Bayer >>> > wrote: >>> >> On Tue, Feb 6, 2018 at 5:09 PM, Антонио Антуан >>> >> wrote: >>> >>> Hello Mike! >>> >>> First of all, thank you for your help with this problem. Me and my >>> >>> crew >>> >>> appreciate it. >>> >>> >>> >>> >>> >>> I have a question in case of `identity_token` for `update` and >>> >>> `delete` >>> >>> methods of `Query` instances. >>> >>> >>> >>> If we take a look on sqlalchemy.orm.persistence.BulkDelete._do_exec, >>> >>> we see, >>> >>> that there is no additonal kwargs passed to `execute` method. So I >>> >>> don't see >>> >>> any way to pass additional kwargs to Session.get_bind. Here is code >>> >>> of 1.0 >>> >>> version: >>> >>> >>> >>> def _do_exec(self): >>> >>> delete_stmt = sql.delete(self.primary_table, >>> >>> self.context.whereclause) >>> >>> >>> >>> self.result = self.query.session.execute( >>> >>> delete_stmt, >>> >>> params=self.query._params, >>> >>> mapper=self.mapper, >>> >>> # need to pass here additional kwargs >>> >>> ) >>> >>> self.rowcount = self.result.rowcount >>> >>> >>> >>> Code from master branch (almost the same): >>> >>> >>> >>> def _execute_stmt(self, stmt): >>> >>> self.result = self.query.session.execute( >>> >>> stmt, params=self.query._params, >>> >>> mapper=self.mapper) >>> >>> self.rowcount = self.result.rowcount >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> As you may remember, I have some kind of `CustomQuery` with >>> >>> `set_bind` >>> >>> method (like the same into >>> >>> sqlalchemy.ext.horizontal_shard.ShardedQuery). So >>> >>> I have `_bind_id` attribute within instances of that class. >>> >>> My question is: Is there any way to pass `self.query._bind_id` to >>> >>> `self.query.session.execute` (see above)? >>> >> >>> >> for bulk operations to work with sharding it would need to be able to >>> >> emit the UPDATE or DELETE statement across multiple shards and work in >>> >> a similar way to ShardedQuery._execute_and_instances, giving it the >>> >> chance to consult with query_chooser. >>> >> >>> >> can you please see if you can work with this? thanks >>> >> >>> >> diff --git a/lib/sqlalchemy/orm/persistence.py >>> >> b/lib/sqlalchemy/orm/persistence.py >>> >> index dc0ae1c38..6c55dee92 100644 >>> >> --- a/lib/sqlalchemy/orm/persistence.py >>> >> +++ b/lib/sqlalchemy/orm/persistence.py >>> >> @@ -1327,9 +1327,8 @@ class BulkUD(object): >>> >> self._do_post() >>> >> >>> >> def _execute_stmt(self, stmt): >>> >> -self.result = self.query.session.execute( >>> >> -stmt, params=self.query._params, >>> >> -mapper=self.mapper) >>> >> +self.result = self.query._execute_crud(stmt, self.mapper) >>> >> + >>> >> self.rowcount = self.result.rowcount >>> >> >>> >> @util.dependencies("sqlalchemy.orm.query") >>> >> diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py >>> >> index 4f7b22cc9..91adfb79d 100644 >>> >> --- a/lib/sqlalchemy/orm/query.py >>> >> +++ b/lib/sqlalchemy/orm/query.py >>> >> @@ -2901,6 +2901,12 @@ class Query(object): >>> >> result = conn.execute(querycontext.statement, self._params) >>> >> return loading.instances(querycontext.query, result, >>> >> querycontext) >>> >> >>> >> +def _execute_crud(self, stmt, mapper): >>> >> +conn = self._connection_from_session( >>> >> +
Re: [sqlalchemy] pass additional kwargs to execute into BulkUD _do_exec method
Hello Mike. Finally got to this. Works perfectly with synchronize_session equals False or 'evaluate'. Got a problem when synchronize_session='fetch'. In that case used BulkFetch._do_pre_synchronize (as you know :)). There is query instance with set `_bind_id` attribute, but it don't passed through `execute`. Here is an example that can illustrate that problem: https://gist.github.com/aCLr/a992ca92138aeee86bf9432693be6d6c It contains Query and Session implementations, which used in my project: MultiBoundQuery and MultiBoundSession. Also, it contains `delete` and `update` execution and checking for update and delete operation. Tracebacks included. вт, 13 февр. 2018 г. в 20:37, Антонио Антуан: > I understand. Excuse me, drown in work. Going to test your code in a > couple of days. > > вт, 13 февр. 2018 г., 20:32 Mike Bayer : > >> i don't plan to move on this until I get feedback. >> >> On Tue, Feb 6, 2018 at 7:32 PM, Mike Bayer >> wrote: >> > code review at >> https://gerrit.sqlalchemy.org/#/c/zzzeek/sqlalchemy/+/656 >> > and ideally would include basic ShardedQuery support >> > >> > On Tue, Feb 6, 2018 at 7:23 PM, Mike Bayer >> wrote: >> >> On Tue, Feb 6, 2018 at 5:09 PM, Антонио Антуан >> wrote: >> >>> Hello Mike! >> >>> First of all, thank you for your help with this problem. Me and my >> crew >> >>> appreciate it. >> >>> >> >>> >> >>> I have a question in case of `identity_token` for `update` and >> `delete` >> >>> methods of `Query` instances. >> >>> >> >>> If we take a look on sqlalchemy.orm.persistence.BulkDelete._do_exec, >> we see, >> >>> that there is no additonal kwargs passed to `execute` method. So I >> don't see >> >>> any way to pass additional kwargs to Session.get_bind. Here is code >> of 1.0 >> >>> version: >> >>> >> >>> def _do_exec(self): >> >>> delete_stmt = sql.delete(self.primary_table, >> >>> self.context.whereclause) >> >>> >> >>> self.result = self.query.session.execute( >> >>> delete_stmt, >> >>> params=self.query._params, >> >>> mapper=self.mapper, >> >>> # need to pass here additional kwargs >> >>> ) >> >>> self.rowcount = self.result.rowcount >> >>> >> >>> Code from master branch (almost the same): >> >>> >> >>> def _execute_stmt(self, stmt): >> >>> self.result = self.query.session.execute( >> >>> stmt, params=self.query._params, >> >>> mapper=self.mapper) >> >>> self.rowcount = self.result.rowcount >> >>> >> >>> >> >>> >> >>> >> >>> As you may remember, I have some kind of `CustomQuery` with `set_bind` >> >>> method (like the same into >> sqlalchemy.ext.horizontal_shard.ShardedQuery). So >> >>> I have `_bind_id` attribute within instances of that class. >> >>> My question is: Is there any way to pass `self.query._bind_id` to >> >>> `self.query.session.execute` (see above)? >> >> >> >> for bulk operations to work with sharding it would need to be able to >> >> emit the UPDATE or DELETE statement across multiple shards and work in >> >> a similar way to ShardedQuery._execute_and_instances, giving it the >> >> chance to consult with query_chooser. >> >> >> >> can you please see if you can work with this? thanks >> >> >> >> diff --git a/lib/sqlalchemy/orm/persistence.py >> >> b/lib/sqlalchemy/orm/persistence.py >> >> index dc0ae1c38..6c55dee92 100644 >> >> --- a/lib/sqlalchemy/orm/persistence.py >> >> +++ b/lib/sqlalchemy/orm/persistence.py >> >> @@ -1327,9 +1327,8 @@ class BulkUD(object): >> >> self._do_post() >> >> >> >> def _execute_stmt(self, stmt): >> >> -self.result = self.query.session.execute( >> >> -stmt, params=self.query._params, >> >> -mapper=self.mapper) >> >> +self.result = self.query._execute_crud(stmt, self.mapper) >> >> + >> >> self.rowcount = self.result.rowcount >> >> >> >> @util.dependencies("sqlalchemy.orm.query") >> >> diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py >> >> index 4f7b22cc9..91adfb79d 100644 >> >> --- a/lib/sqlalchemy/orm/query.py >> >> +++ b/lib/sqlalchemy/orm/query.py >> >> @@ -2901,6 +2901,12 @@ class Query(object): >> >> result = conn.execute(querycontext.statement, self._params) >> >> return loading.instances(querycontext.query, result, >> querycontext) >> >> >> >> +def _execute_crud(self, stmt, mapper): >> >> +conn = self._connection_from_session( >> >> +mapper=mapper, clause=stmt, close_with_result=True) >> >> + >> >> +return conn.execute(stmt, self._params) >> >> + >> >> def _get_bind_args(self, querycontext, fn, **kw): >> >> return fn( >> >> mapper=self._bind_mapper(), >> >> >> >> >> >> >> >> >> >> >> >> >> >>> >> >>> -- >> >>> SQLAlchemy - >> >>> The Python SQL Toolkit and Object Relational Mapper >> >>> >> >>>
Re: [sqlalchemy] Problems with versioning when using a class mapped on a select of a table in SQLAlchemy 1.2
Ok, thanks for the quick response! Keep up the exceptionally good work! Op woensdag 21 februari 2018 17:19:44 UTC+1 schreef Mike Bayer: > > On Wed, Feb 21, 2018 at 10:04 AM, Mike Bayer> wrote: > > On Wed, Feb 21, 2018 at 6:33 AM, Jeff Horemans > wrote: > >> When migrating from 1.1 tot 1.2, we noticed that the version_id is not > >> getting set on a class that maps to a select of a table. > >> I've added a test case class below to the test_versioning.py included > in > >> SQLAlchemy to confirm this behaviour. > >> This case runs fine in versions 1.0 and 1.1, but gives a KeyError on > the > >> version_id in 1.2 as shown in the stack traces below. > >> > >> I'll be happy to make an issue on the repository if needed. > > > > no need, this is completely perfect, I'll create the issue. I hope to > > get this into 1.2.4 which I need to release hopefully this week / > > today preferred (but I've wanted to release for two days already, ran > > out of time). thanks! > > this is > https://bitbucket.org/zzzeek/sqlalchemy/issues/4193/versioning-logic-fails-for-mapping-against > > , where you'll note that I found two more issues with versioning and > mappers to select statements, which you've already been working > around. this is not a use case we tested for. > > > > > > > >> > >> class VersioningSelectTest(fixtures.MappedTest): > >> > >> __backend__ = True > >> > >> @classmethod > >> def define_tables(cls, metadata): > >> Table('version_table', metadata, > >> Column('id', Integer, primary_key=True, > >> test_needs_autoincrement=True), > >> Column('version_id', Integer, nullable=False), > >> Column('value', String(40), nullable=False)) > >> > >> @classmethod > >> def setup_classes(cls): > >> class Foo(cls.Basic): > >> pass > >> > >> def _fixture(self): > >> Foo, version_table = self.classes.Foo, > self.tables.version_table > >> > >> current = version_table.select().where(version_table.c.id > > >> 0).alias('current_table') > >> > >> mapper(Foo, current, version_id_col=version_table.c.version_id) > >> s1 = Session() > >> return s1 > >> > >> @testing.emits_warning(r".*versioning cannot be verified") > >> def test_multiple_updates(self): > >> Foo = self.classes.Foo > >> > >> s1 = self._fixture() > >> f1 = Foo(value='f1') > >> f2 = Foo(value='f2') > >> s1.add_all((f1, f2)) > >> s1.commit() > >> > >> f1.value = 'f1rev2' > >> f2.value = 'f2rev2' > >> s1.commit() > >> > >> eq_( > >> s1.query(Foo.id, Foo.value, > >> Foo.version_id).order_by(Foo.id).all(), > >> [(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)] > >> ) > >> > >> > >> > >> FAIL > >> > test/orm/test_versioning.py::VersioningSelectTest_postgresql+psycopg2_9_5_11::()::test_multiple_updates > > > >> > >> == > FAILURES > >> == > >> ___ > >> VersioningSelectTest_postgresql+psycopg2_9_5_11.test_multiple_updates > >> > >> Traceback (most recent call last): > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/orm/test_versioning.py", > > > >> line 131, in test_multiple_updates > >> s1.commit() > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 943, in commit > >> self.transaction.commit() > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 467, in commit > >> self._prepare_impl() > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 447, in _prepare_impl > >> self.session.flush() > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 2243, in flush > >> self._flush(objects) > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 2369, in _flush > >> transaction.rollback(_capture_exception=True) > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/util/langhelpers.py", > > > >> line 66, in __exit__ > >> compat.reraise(exc_type, exc_value, exc_tb) > >> File > >> > "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py", > > > >> line 2333, in