Re: [sqlalchemy] pass additional kwargs to execute into BulkUD _do_exec method

2018-02-22 Thread Mike Bayer
On Thu, Feb 22, 2018 at 6:28 AM, Антонио Антуан  wrote:
> Hello Mike.
> Finally got to this. Works perfectly with synchronize_session equals False
> or 'evaluate'.
>
> Got a problem when synchronize_session='fetch'. In that case used
> BulkFetch._do_pre_synchronize (as you know :)). There is query instance with
> set `_bind_id` attribute, but it don't passed through `execute`.
>
> Here is an example that can illustrate that problem:
> https://gist.github.com/aCLr/a992ca92138aeee86bf9432693be6d6c
> It contains Query and Session implementations, which used in my project:
> MultiBoundQuery and MultiBoundSession.
> Also, it contains `delete` and `update` execution and checking for update
> and delete operation. Tracebacks included.

continuing this at
https://bitbucket.org/zzzeek/sqlalchemy/issues/4196/support-sharded-query-for-bulk-update
, the gerrit needs to have tests for these use cases.


>
>
>
> вт, 13 февр. 2018 г. в 20:37, Антонио Антуан :
>>
>> I understand. Excuse me, drown in work. Going to test your code in a
>> couple of days.
>>
>>
>> вт, 13 февр. 2018 г., 20:32 Mike Bayer :
>>>
>>> i don't plan to move on this until I get feedback.
>>>
>>> On Tue, Feb 6, 2018 at 7:32 PM, Mike Bayer 
>>> wrote:
>>> > code review at
>>> > https://gerrit.sqlalchemy.org/#/c/zzzeek/sqlalchemy/+/656
>>> > and ideally would include basic ShardedQuery support
>>> >
>>> > On Tue, Feb 6, 2018 at 7:23 PM, Mike Bayer 
>>> > wrote:
>>> >> On Tue, Feb 6, 2018 at 5:09 PM, Антонио Антуан 
>>> >> wrote:
>>> >>> Hello Mike!
>>> >>> First of all, thank you for your help with this problem. Me and my
>>> >>> crew
>>> >>> appreciate it.
>>> >>>
>>> >>>
>>> >>> I have a question in case of `identity_token` for `update` and
>>> >>> `delete`
>>> >>> methods of `Query` instances.
>>> >>>
>>> >>> If we take a look on sqlalchemy.orm.persistence.BulkDelete._do_exec,
>>> >>> we see,
>>> >>> that there is no additonal kwargs passed to `execute` method. So I
>>> >>> don't see
>>> >>> any way to pass additional kwargs to Session.get_bind. Here is code
>>> >>> of 1.0
>>> >>> version:
>>> >>>
>>> >>> def _do_exec(self):
>>> >>> delete_stmt = sql.delete(self.primary_table,
>>> >>>  self.context.whereclause)
>>> >>>
>>> >>> self.result = self.query.session.execute(
>>> >>> delete_stmt,
>>> >>> params=self.query._params,
>>> >>> mapper=self.mapper,
>>> >>> # need to pass here additional kwargs
>>> >>> )
>>> >>> self.rowcount = self.result.rowcount
>>> >>>
>>> >>> Code from master branch (almost the same):
>>> >>>
>>> >>> def _execute_stmt(self, stmt):
>>> >>> self.result = self.query.session.execute(
>>> >>> stmt, params=self.query._params,
>>> >>> mapper=self.mapper)
>>> >>> self.rowcount = self.result.rowcount
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> As you may remember, I have some kind of `CustomQuery` with
>>> >>> `set_bind`
>>> >>> method (like the same into
>>> >>> sqlalchemy.ext.horizontal_shard.ShardedQuery). So
>>> >>> I have `_bind_id` attribute within instances of that class.
>>> >>> My question is: Is there any way to pass `self.query._bind_id` to
>>> >>> `self.query.session.execute` (see above)?
>>> >>
>>> >> for bulk operations to work with sharding it would need to be able to
>>> >> emit the UPDATE or DELETE statement across multiple shards and work in
>>> >> a similar way to ShardedQuery._execute_and_instances, giving it the
>>> >> chance to consult with query_chooser.
>>> >>
>>> >> can you please see if you can work with this? thanks
>>> >>
>>> >> diff --git a/lib/sqlalchemy/orm/persistence.py
>>> >> b/lib/sqlalchemy/orm/persistence.py
>>> >> index dc0ae1c38..6c55dee92 100644
>>> >> --- a/lib/sqlalchemy/orm/persistence.py
>>> >> +++ b/lib/sqlalchemy/orm/persistence.py
>>> >> @@ -1327,9 +1327,8 @@ class BulkUD(object):
>>> >>  self._do_post()
>>> >>
>>> >>  def _execute_stmt(self, stmt):
>>> >> -self.result = self.query.session.execute(
>>> >> -stmt, params=self.query._params,
>>> >> -mapper=self.mapper)
>>> >> +self.result = self.query._execute_crud(stmt, self.mapper)
>>> >> +
>>> >>  self.rowcount = self.result.rowcount
>>> >>
>>> >>  @util.dependencies("sqlalchemy.orm.query")
>>> >> diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
>>> >> index 4f7b22cc9..91adfb79d 100644
>>> >> --- a/lib/sqlalchemy/orm/query.py
>>> >> +++ b/lib/sqlalchemy/orm/query.py
>>> >> @@ -2901,6 +2901,12 @@ class Query(object):
>>> >>  result = conn.execute(querycontext.statement, self._params)
>>> >>  return loading.instances(querycontext.query, result,
>>> >> querycontext)
>>> >>
>>> >> +def _execute_crud(self, stmt, mapper):
>>> >> +conn = self._connection_from_session(
>>> >> +

Re: [sqlalchemy] pass additional kwargs to execute into BulkUD _do_exec method

2018-02-22 Thread Антонио Антуан
Hello Mike.
Finally got to this. Works perfectly with synchronize_session equals False
or 'evaluate'.

Got a problem when synchronize_session='fetch'. In that case used
BulkFetch._do_pre_synchronize (as you know :)). There is query instance
with set `_bind_id` attribute, but it don't passed through `execute`.

Here is an example that can illustrate that problem:
https://gist.github.com/aCLr/a992ca92138aeee86bf9432693be6d6c
It contains Query and Session implementations, which used in my project:
MultiBoundQuery and MultiBoundSession.
Also, it contains `delete` and `update` execution and checking for update
and delete operation. Tracebacks included.



вт, 13 февр. 2018 г. в 20:37, Антонио Антуан :

> I understand. Excuse me, drown in work. Going to test your code in a
> couple of days.
>
> вт, 13 февр. 2018 г., 20:32 Mike Bayer :
>
>> i don't plan to move on this until I get feedback.
>>
>> On Tue, Feb 6, 2018 at 7:32 PM, Mike Bayer 
>> wrote:
>> > code review at
>> https://gerrit.sqlalchemy.org/#/c/zzzeek/sqlalchemy/+/656
>> > and ideally would include basic ShardedQuery support
>> >
>> > On Tue, Feb 6, 2018 at 7:23 PM, Mike Bayer 
>> wrote:
>> >> On Tue, Feb 6, 2018 at 5:09 PM, Антонио Антуан 
>> wrote:
>> >>> Hello Mike!
>> >>> First of all, thank you for your help with this problem. Me and my
>> crew
>> >>> appreciate it.
>> >>>
>> >>>
>> >>> I have a question in case of `identity_token` for `update` and
>> `delete`
>> >>> methods of `Query` instances.
>> >>>
>> >>> If we take a look on sqlalchemy.orm.persistence.BulkDelete._do_exec,
>> we see,
>> >>> that there is no additonal kwargs passed to `execute` method. So I
>> don't see
>> >>> any way to pass additional kwargs to Session.get_bind. Here is code
>> of 1.0
>> >>> version:
>> >>>
>> >>> def _do_exec(self):
>> >>> delete_stmt = sql.delete(self.primary_table,
>> >>>  self.context.whereclause)
>> >>>
>> >>> self.result = self.query.session.execute(
>> >>> delete_stmt,
>> >>> params=self.query._params,
>> >>> mapper=self.mapper,
>> >>> # need to pass here additional kwargs
>> >>> )
>> >>> self.rowcount = self.result.rowcount
>> >>>
>> >>> Code from master branch (almost the same):
>> >>>
>> >>> def _execute_stmt(self, stmt):
>> >>> self.result = self.query.session.execute(
>> >>> stmt, params=self.query._params,
>> >>> mapper=self.mapper)
>> >>> self.rowcount = self.result.rowcount
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> As you may remember, I have some kind of `CustomQuery` with `set_bind`
>> >>> method (like the same into
>> sqlalchemy.ext.horizontal_shard.ShardedQuery). So
>> >>> I have `_bind_id` attribute within instances of that class.
>> >>> My question is: Is there any way to pass `self.query._bind_id` to
>> >>> `self.query.session.execute` (see above)?
>> >>
>> >> for bulk operations to work with sharding it would need to be able to
>> >> emit the UPDATE or DELETE statement across multiple shards and work in
>> >> a similar way to ShardedQuery._execute_and_instances, giving it the
>> >> chance to consult with query_chooser.
>> >>
>> >> can you please see if you can work with this? thanks
>> >>
>> >> diff --git a/lib/sqlalchemy/orm/persistence.py
>> >> b/lib/sqlalchemy/orm/persistence.py
>> >> index dc0ae1c38..6c55dee92 100644
>> >> --- a/lib/sqlalchemy/orm/persistence.py
>> >> +++ b/lib/sqlalchemy/orm/persistence.py
>> >> @@ -1327,9 +1327,8 @@ class BulkUD(object):
>> >>  self._do_post()
>> >>
>> >>  def _execute_stmt(self, stmt):
>> >> -self.result = self.query.session.execute(
>> >> -stmt, params=self.query._params,
>> >> -mapper=self.mapper)
>> >> +self.result = self.query._execute_crud(stmt, self.mapper)
>> >> +
>> >>  self.rowcount = self.result.rowcount
>> >>
>> >>  @util.dependencies("sqlalchemy.orm.query")
>> >> diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
>> >> index 4f7b22cc9..91adfb79d 100644
>> >> --- a/lib/sqlalchemy/orm/query.py
>> >> +++ b/lib/sqlalchemy/orm/query.py
>> >> @@ -2901,6 +2901,12 @@ class Query(object):
>> >>  result = conn.execute(querycontext.statement, self._params)
>> >>  return loading.instances(querycontext.query, result,
>> querycontext)
>> >>
>> >> +def _execute_crud(self, stmt, mapper):
>> >> +conn = self._connection_from_session(
>> >> +mapper=mapper, clause=stmt, close_with_result=True)
>> >> +
>> >> +return conn.execute(stmt, self._params)
>> >> +
>> >>  def _get_bind_args(self, querycontext, fn, **kw):
>> >>  return fn(
>> >>  mapper=self._bind_mapper(),
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>>
>> >>> --
>> >>> SQLAlchemy -
>> >>> The Python SQL Toolkit and Object Relational Mapper
>> >>>
>> >>> 

Re: [sqlalchemy] Problems with versioning when using a class mapped on a select of a table in SQLAlchemy 1.2

2018-02-22 Thread Jeff Horemans
Ok, thanks for the quick response! Keep up the exceptionally good work!

Op woensdag 21 februari 2018 17:19:44 UTC+1 schreef Mike Bayer:
>
> On Wed, Feb 21, 2018 at 10:04 AM, Mike Bayer  > wrote: 
> > On Wed, Feb 21, 2018 at 6:33 AM, Jeff Horemans  > wrote: 
> >> When migrating from 1.1 tot 1.2, we noticed that the version_id is not 
> >> getting set on a class that maps to a select of a table. 
> >> I've added a test case class below to the test_versioning.py included 
> in 
> >> SQLAlchemy to confirm this behaviour. 
> >> This case runs fine in versions 1.0 and 1.1, but gives a KeyError on 
> the 
> >> version_id in 1.2 as shown in the stack traces below. 
> >> 
> >> I'll be happy to make an issue on the repository if needed. 
> > 
> > no need, this is completely perfect, I'll create the issue.  I hope to 
> > get this into 1.2.4 which I need to release hopefully this week / 
> > today preferred (but I've wanted to release for two days already, ran 
> > out of time).   thanks! 
>
> this is 
> https://bitbucket.org/zzzeek/sqlalchemy/issues/4193/versioning-logic-fails-for-mapping-against
>  
> , where you'll note that I found two more issues with versioning and 
> mappers to select statements, which you've already been working 
> around.  this is not a use case we tested for. 
>
>
> > 
> > 
> >> 
> >> class VersioningSelectTest(fixtures.MappedTest): 
> >> 
> >> __backend__ = True 
> >> 
> >> @classmethod 
> >> def define_tables(cls, metadata): 
> >> Table('version_table', metadata, 
> >>   Column('id', Integer, primary_key=True, 
> >>  test_needs_autoincrement=True), 
> >>   Column('version_id', Integer, nullable=False), 
> >>   Column('value', String(40), nullable=False)) 
> >> 
> >> @classmethod 
> >> def setup_classes(cls): 
> >> class Foo(cls.Basic): 
> >> pass 
> >> 
> >> def _fixture(self): 
> >> Foo, version_table = self.classes.Foo, 
> self.tables.version_table 
> >> 
> >> current = version_table.select().where(version_table.c.id > 
> >> 0).alias('current_table') 
> >> 
> >> mapper(Foo, current, version_id_col=version_table.c.version_id) 
> >> s1 = Session() 
> >> return s1 
> >> 
> >> @testing.emits_warning(r".*versioning cannot be verified") 
> >> def test_multiple_updates(self): 
> >> Foo = self.classes.Foo 
> >> 
> >> s1 = self._fixture() 
> >> f1 = Foo(value='f1') 
> >> f2 = Foo(value='f2') 
> >> s1.add_all((f1, f2)) 
> >> s1.commit() 
> >> 
> >> f1.value = 'f1rev2' 
> >> f2.value = 'f2rev2' 
> >> s1.commit() 
> >> 
> >> eq_( 
> >> s1.query(Foo.id, Foo.value, 
> >> Foo.version_id).order_by(Foo.id).all(), 
> >> [(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)] 
> >> ) 
> >> 
> >> 
> >> 
> >> FAIL 
> >> 
> test/orm/test_versioning.py::VersioningSelectTest_postgresql+psycopg2_9_5_11::()::test_multiple_updates
>  
>
> >> 
> >> == 
> FAILURES 
> >> == 
> >> ___ 
> >> VersioningSelectTest_postgresql+psycopg2_9_5_11.test_multiple_updates 
> >>  
> >> Traceback (most recent call last): 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/orm/test_versioning.py",
>  
>
> >> line 131, in test_multiple_updates 
> >> s1.commit() 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 943, in commit 
> >> self.transaction.commit() 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 467, in commit 
> >> self._prepare_impl() 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 447, in _prepare_impl 
> >> self.session.flush() 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 2243, in flush 
> >> self._flush(objects) 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 2369, in _flush 
> >> transaction.rollback(_capture_exception=True) 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/util/langhelpers.py",
>  
>
> >> line 66, in __exit__ 
> >> compat.reraise(exc_type, exc_value, exc_tb) 
> >>   File 
> >> 
> "/home/jeffh/vortex-workspace/v-finance/subrepos/SQLAlchemy-1.2.3/test/../lib/sqlalchemy/orm/session.py",
>  
>
> >> line 2333, in