Thank you Mike!!!
Sorry about that - my example had errors :(
In reality, I am calling `celery_state_session.flush()` and
`celery_state_session.commit()` right after I update any of the `cs.xxxx`
values.
@app.task(
base=BaseTask,
bind=True
)
def my_task(self, ...):
# Persist checkpoint 'started'
cs = celery_state_session.query(CeleryTask).get(self.request.id)
cs.started = datetime.now()
celery_state_session.flush()
celery_state_session.commit()
try:
with transaction.manager:
mymodel = domain_model_session.query(MyModel).one()
mymodel.value = "..."
*# Persist checkpoint 'value updated' <<< This checkpoint is
never persisted !!!!*
cs = celery_state_session.query(CeleryTask).get(self.request.id)
cs.model_value_updated = datetime.now()
celery_state_session.flush()
celery_state_session.commit()
except Exception as e:
*# Persist checkpoint 'transaction failed' <<< This checkpoint
is never persisted !!!!*
cs = celery_state_session.query(CeleryTask).get(self.request.id)
cs.failed_with_exception = datetime.now()
celery_state_session.flush()
celery_state_session.commit()
finally:
# Persist checkpoint 'celery task completed anyhow'
cs = celery_state_session.query(CeleryTask).get(self.request.id)
cs.failed_with_exception = datetime.now()
celery_state_session.flush()
celery_state_session.commit()
Couple of clarifications that might be worth mentioning:
The `*# persist checkpoint*` is in reality a function kinda like this:
def checkpoint(self, name):
eng = engine_from_config(self.app.conf['PYRAMID_REGISTRY'].settings)
eng.update_execution_options(autocommit=True, autoflush=False)
factory = sessionmaker()
factory.configure(bind=eng)
celery_state_session = factory()
celery_state = celery_state_session.query(..).get(..)
celery_state.value = ...
celery_state_session.flush()
celery_state_session.commit()
celery_state_session.close()
Notice I create a brand new 'eng` and a brand new `celery_state_session`
each time I call my checkpoint function. I am not using `scoped_session`
either. Perhaps that's all incorrect?
I have many celery tasks running at the same time changing the same
`mymodel` row so concurrent updates are typical.
When all is good the SQL log shows a BEGIN (implicit) followed by UPDATE
and finally a COMMIT for mymodel and my celery_state.
But whenever `psycopg2.extensions.TransactionRollbackError` happens, I see
a COMMIT but no UPDATE for `celery_state` preceding it.
Did the `celery_state_session` got bound to the `transaction.manager`
automatically? because threadlocals?
What's the correct way to create a session that is not automatically bound
to any transaction or that can be explicitly bound to an isolated
transaction that can be committed whenever?
On Thursday, June 21, 2018 at 7:37:52 PM UTC-5, Mike Bayer wrote:
>
> From looking at your code example, I don't see you calling
> celery_session.flush() or celery_session.commit(), so nothing is going to
> persist. Just setting an attribute value does not send any SQL.
>
> On Thu, Jun 21, 2018, 7:22 PM HP3 <[email protected] <javascript:>>
> wrote:
>
>> Hello all:
>>
>> Within a celery task, I need to have 2 unrelated sessions that can
>> commit/rollback independently of each other:
>> One session (`domain_model_session`) performs vanilla domain model
>> operations and the other (`celery_state_session`) is to persist the state
>> of the celery task itself **isolated from the domain model one**.
>>
>> *Imagine "checkpoints"*
>>
>> Something along these lines
>>
>> @task
>> def my task(...):
>>
>> *# Persist checkpoint 'started'*
>>
>> cs = celery_state_session.query(CeleryTask).get(self.request.id)
>> cs.started = datetime.now()
>> celery_state_session.commit()
>>
>> try:
>>
>>
>>
>> with transaction.manager:
>> mymodel = domain_model_session.query(MyModel).one()
>> mymodel.value = "..."
>>
>> *# Persist checkpoint 'value updated' *
>>
>> cs = celery_state_session.query(CeleryTask).get(self.request.id)
>> cs.model_value_updated = datetime.now()
>>
>> except Exception as e:
>>
>>
>> * # Persist checkpoint 'transaction failed' <<< This
>> checkpoint is never persisted !!!!*
>>
>> cs = celery_state_session.query(CeleryTask).get(self.request.id)
>> cs.failed_with_exception = datetime.now()
>>
>> finally:
>>
>>
>> *# Persist checkpoint 'celery task completed anyhow' *
>>
>> cs = celery_state_session.query(CeleryTask).get(self.request.id)
>> cs.failed_with_exception = datetime.now()
>>
>>
>>
>> I've been trying multiple things but I can't get the behavior I need.
>>
>> No matter what, whenever the transaction associated to
>> `domain_model_session` fails it "rolls back" all operations performed by
>> `celery_state_session` as if `celery_state_session` was automatically bound
>> to the same transaction.manager.
>>
>> How am I supposed to create a session that works isolated?
>>
>>
>>
>> --
>> SQLAlchemy -
>> The Python SQL Toolkit and Object Relational Mapper
>>
>> http://www.sqlalchemy.org/
>>
>> To post example code, please provide an MCVE: Minimal, Complete, and
>> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
>> description.
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "sqlalchemy" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/sqlalchemy.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.