On Nov 6, 2011, at 3:54 AM, AZ (Wei-Ning Huang) wrote:

> Hi list,
> 
> I've been getting ResoruceClosedError with MySQL server, which worked
> fine with SQLite.
> I'am using Celery queue, and those erros always happen within the
> celery tasks.

For background on this message, there's a post just a day ago talking about it 
(there, the issue seems to be occurring in conjunction with the "update" 
lockmode: 
https://groups.google.com/forum/#!topic/sqlalchemy/MBRcTJzi8Kk/discussion ).    
If the query executed is an UPDATE or similar, the statement returns no 
results.   The DBAPI will not provide a cursor.description and thus is 
considered to have no rows to return.  SQLAlchemy then closes it automatically.

If the statement emitted was truly a SELECT statement, and then 
cursor.description is None as a result of that statement, it's likely a bug in 
MySQL or MySQLdb you're coming across (these kinds of errors are very common in 
the MySQL world).    I'd suggest looking very closely at the emitted SQL using 
echoing, and attempt to create the same SQL conversation using a plain DBAPI 
cursor.

Another thing to be careful of as I see you're talking about "multiple workers" 
is to absolutely make sure that an individual MySQL connection is never ever 
accessed by multiple threads.   I'm not sure what mechanism you have behind 
your get_by()/delete_by()/etc. mechanism there but hopefully you're using a 
thread-scoped session, and ensuring that all objects associated with that 
session are kept local to a single thread.

When I do a jobqueue type of thing I tend to SELECT a bunch of rows (ideally 
with the FOR UPDATE option so that they are locked), then immediately UPDATE a 
status flag on those rows which changes it them from "PENDING" to "QUEUED", 
thereby preventing concurrent workers from getting at those rows - I now have 
their primary keys in memory so I can then continue working with them.   The 
rows are identified for UPDATE either via "WHERE status='PENDING'" or based on 
the subset of PKs I have "WHERE id IN (x, y, z)".   I don't get into things 
like comparison of timestamps since that approach can easily produce conflicts. 
 

The str() call on time() ( is that datetime.time() or time.time()?) seems 
unnecessary.   If the timestamp is actually the epoch, i.e. time.time(), I'd 
probably not store that in the database directly, I'd instead go for a full 
date/time value using a DATETIME or TIMESTAMP type (and I'd store it in UTC).   

If concurrent workers are hitting pop_timeout() its likely that multiple 
workers will have the same value for time(), so they'd be hitting the same row 
in those cases.   Perhaps the row is being deleted at the same moment another 
worker is trying to SELECT it, and MySQL's sloppy handling of things is giving 
you a broken cursor.    Depends on if the error occurs all the time or only 
under load.

> 
> @celery.task
> def do_something():
>      records = SomeTable.pop_timeout()
>      ....
> 
> and for SomeTable.pop_timeout(), the pop_timeout method is as follows:
> 
> class SomeTable(DeclarativeBase):
>    ...
>    def pop_timeout(klass):
>        stamp = str(time())
>        now = time()
>        klass.query(klass).filter((klass.transtamp == None) & (now -
> klass.timestamp > TIMEOUT)).update({'transtamp': stamp})
>        klass.commit()
>        records = klass.get_by(transtamp=stamp, order_by=order_by,
> eager=eager)
>        klass.delete_by(transtamp=stamp)
>        klass.commit()
>        return records








> 
> And the tracebak is:
>    caches = ImageCache.pop_timeout()
>  File "/home/aitjcize/Work/shotwill/shotwill/backend/database.py",
> line 1081, i
> n pop_timeout
>    now - ImageCache.timestamp >= app.config['IMAGE_CACHE_TIMEOUT']
>  File "/home/aitjcize/Work/shotwill/shotwill/backend/database.py",
> line 226, in
> pop
>    records = klass.get_by(transtamp=stamp, order_by=order_by,
> eager=eager)
>  File "/home/aitjcize/Work/shotwill/shotwill/backend/database.py",
> line 204, in
> get_by
>    return query_object.all()
>  File "/home/aitjcize/Work/shotwill/python/SQLAlchemy-0.7.2-py2.7-
> linux-x86_64.
> egg/sqlalchemy/orm/query.py", line 1729, in all
>    return list(self)
>  File "/home/aitjcize/Work/shotwill/python/SQLAlchemy-0.7.2-py2.7-
> linux-x86_64.
> egg/sqlalchemy/orm/query.py", line 1953, in instances
>    fetch = cursor.fetchall()
>  File "/home/aitjcize/Work/shotwill/python/SQLAlchemy-0.7.2-py2.7-
> linux-x86_64.
> egg/sqlalchemy/engine/base.py", line 2979, in fetchall
>    l = self.process_rows(self._fetchall_impl())
>  File "/home/aitjcize/Work/shotwill/python/SQLAlchemy-0.7.2-py2.7-
> linux-x86_64.
> egg/sqlalchemy/engine/base.py", line 2948, in _fetchall_impl
>    self._non_result()
>  File "/home/aitjcize/Work/shotwill/python/SQLAlchemy-0.7.2-py2.7-
> linux-x86_64.
> egg/sqlalchemy/engine/base.py", line 2953, in _non_result
>    "This result object does not return rows. "
> ResourceClosedError: This result object does not return rows. It has
> been closed
> automatically.
> 
> Does anyone has any idea what is going on? It only happen when using
> mysql.
> And another thing, because there maybe multiple worker executing the
> same celery task
> at the same time, so the pop_timeout() function is what I came up
> with. Is there a better
> solution for popping records from a database?
> 
> Thanks in advance.
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To post to this group, send email to [email protected].
> To unsubscribe from this group, send email to 
> [email protected].
> For more options, visit this group at 
> http://groups.google.com/group/sqlalchemy?hl=en.
> 

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en.

Reply via email to