Hello.
I modified the recipe at
http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery
to better suit my needs. Perhaps someone else will find this useful:
####################
#### CODE BEGIN ####
####################
from sqlalchemy.orm import subqueryload
from sqlalchemy.sql.expression import distinct
def windowed_query(filter_query, data_query, column, window_size):
"""Perform (a correct) yield_per() operation. See WindowedQuery.yield_per()
for more.
EXAMPLE:
gen = windowed_query(
filter_query=session.query(Foo).filter(Foo.name.like(u'%foo%')),
data_query=session.query(Foo).options(Foo.bars),
column=Foo.id,
window_size=50,
)
for each_foo in gen:
print each_foo.name
for each_bar in each_foo.bars:
print each_bar
"""
return WindowedQuery(filter_query, data_query,
column).yield_per(window_size)
class WindowedQuery(object):
"""Perform (a correct) yield_per() operation."""
def __init__(self, filter_query, data_query, column):
# A query with NO options(...) and NO order_by(...). MUST contain all
# necessary join() and filter() to limit the result set as desired.
self._filter_query = filter_query
# A simple query with options() to fetch the actual data.
self._data_query = data_query
# id column of the (main) result ORM class.
self._column = column
def yield_per(self, window_size):
"""Process results in chunks.
Steps:
* Obtain ids of ALL result rows via self._filter_query.
* Divide ids to chunks of equal size and perform ONE query for EACH
chunk to fetch the data via self._data_query.
A chunk is determined by the test q.filter(self._column.in_(chunk)).
This is the only way that works in presence of the read-committed
isolation level.
"""
q = self._data_query
for each_window in self._windows(window_size):
for each_result in q.filter(each_window):
yield each_result
def _windows(self, window_size):
chunk = []
chunk_size = 0
for each in self._q_column():
if chunk_size < window_size:
chunk.append(each)
chunk_size += 1
if chunk_size == window_size:
yield self._window_for_chunk(chunk)
chunk = []
chunk_size = 0
if chunk_size > 0:
yield self._window_for_chunk(chunk)
def _q_column(self):
# distinct() ensures that each id is returned at most once despite
# a possible multiplying effect of a join().
return self._filter_query.with_entities(distinct(self._column))
def _window_for_chunk(self, chunk):
return self._column.in_(chunk)
##################
#### CODE END ####
##################
MOTIVATION: I have learned recently that Query.yield_per() does not work nicely
in combination with subqueryload(). The above recipe fixes that. Unfortunately
its usage is not as elegant and simple as q.yield_per(...).
If you have any idea how to accomplish the same with ONE query only (in SA
0.7.9):
def windowed_query(query, column, window_size):
query --magic-> filter_query
query --magic-> data_query
...
I would very much like to hear about it.
PERFORMANCE: My first tests suggest that it might be one order of magnitude
better than the Query.yield_per() we use now.
Note also that yield_per() with subqueryload() was still about twice as fast as
the same query without yield_per(). But this will be highly dependent on the
query I guess.
WARNING: We do not use this in the production yet. Use at your own risk.
Happy SA hacking,
Ladislav Lenart
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/sqlalchemy?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.