On Mon, 28 Apr 2008 16:22:44 -0400
Michael Bayer <[EMAIL PROTECTED]> wrote:
>
> so, lets name it _merge_ordering() to start so we have some future
> leeway on the name.
>
Changes made.
> Well, in fact the result.close() at the end there is not really
> needed; when all result rows are exhausted, ResultProxy will issue
> close() on the cursor anyway (and if the connection itself was
> created with close_with_result=True, the Connection will also be
> returned to the connection pool). So in this case its probably
> better if shard.py were using session.execute() instead of
> session.connection(), since this allows the Session to make the
> choice, based on transactional state, if the Connection should be a
> close_with_result or not. I think the reason shard.py isnt doing
> that right now is because there were more paramters to be sent to
> connection() that ShardedSession adds, so ShardedSession would need a
> corresponding "execute()" method added which works in an analgous way
> to Session.execute().
Alright, I removed the result.close() hook code from the first patch.
Makes things considerably simpler. It did occur to me that the original
try-finally block that was doing this would also close the results in
the case of an exception being thrown. Should I deal with this case?
Also, I haven't looked into writing such an override for
ShardedQuery.execute(), but suppose I could see about doing it if it
would make things cleaner.
>
> All in all its a great patch and if you can make those adjustments
> that would be great. Thanks !
>
> - mike
Great :) .. I've gotten a lot of use out of SA, it's got a great
community and is a joy to hack on. Glad I can contribute something to
it.
I'd really like to throw more testing at _merge_ordering, I'll try to
come up with something stand-alone, and then see about working it into
SA's test suite. The framework looks quite involved and I haven't had a
chance to grok it yet.
Kyle
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---
Use a generator to concatenate results from ShardedQuerys.
Adds a function `iter_concatenating` that accepts a list of iterators, and
yields every object produced by each of them. Alter ShardedQuery to use this
function instead of loading all of the potentially large sets of result objects
into a list before concatenating them.
diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py
--- a/lib/sqlalchemy/orm/shard.py
+++ b/lib/sqlalchemy/orm/shard.py
@@ -88,20 +88,15 @@
def _execute_and_instances(self, context):
if self._shard_id is not None:
result = self.session.connection(mapper=self.mapper, shard_id=self._shard_id).execute(context.statement, **self._params)
- try:
- return iter(self.instances(result, querycontext=context))
- finally:
- result.close()
+ return self.iterate_instances(result, querycontext=context)
else:
- partial = []
+ partials = []
for shard_id in self.query_chooser(self):
result = self.session.connection(mapper=self.mapper, shard_id=shard_id).execute(context.statement, **self._params)
- try:
- partial = partial + list(self.instances(result, querycontext=context))
- finally:
- result.close()
+ partials.append(self.iterate_instances(result, querycontext=context))
+
# if some kind of in memory 'sorting' were done, this is where it would happen
- return iter(partial)
+ return util.iter_concatenating(partials)
def get(self, ident, **kwargs):
if self._shard_id is not None:
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -229,6 +229,21 @@
yield y
else:
yield elem
+
+def iter_concatenating(iters):
+ """Concatenate iterables using a generator.
+
+ Yields every item produced by every iterable in the given list, in
+ concatenated order.
+ """
+ for iterable in iters:
+ try:
+ while True:
+ yield iterable.next()
+
+ # Begin yielding stuff off the next iterator.
+ except StopIteration:
+ pass
class ArgSingleton(type):
instances = weakref.WeakValueDictionary()
Add ability to do ordered merge of ShardedQuery results.
`iter_merging` creates an iterator that yields an ordered merge of the outputs
of the list of iterators passed in. The ordering is determined by a function
`ordering` which takes a list of objects and returns the member that is next in
the ordering.
`ShardedQuery._merge_ordering()` accepts such a function for sorting query
results and creates a ShardedQuery that will merge the results from each shard
using it.
diff --git a/lib/sqlalchemy/orm/shard.py b/lib/sqlalchemy/orm/shard.py
--- a/lib/sqlalchemy/orm/shard.py
+++ b/lib/sqlalchemy/orm/shard.py
@@ -68,6 +68,7 @@
self.id_chooser = self.session.id_chooser
self.query_chooser = self.session.query_chooser
self._shard_id = None
+ self._merge_ordering_fun = None
def _clone(self):
q = ShardedQuery.__new__(ShardedQuery)
@@ -96,7 +97,10 @@
partials.append(self.iterate_instances(result, querycontext=context))
# if some kind of in memory 'sorting' were done, this is where it would happen
- return util.iter_concatenating(partials)
+ if self._merge_ordering_fun:
+ return util.iter_merging(partials, self._merge_ordering_fun)
+ else:
+ return util.iter_concatenating(partials)
def get(self, ident, **kwargs):
if self._shard_id is not None:
@@ -120,3 +124,23 @@
return o
else:
raise exceptions.InvalidRequestError("No instance found for identity %s" % repr(ident))
+
+ def _merge_ordering(self, ordering):
+ """Set an ordering function to merge the shard results.
+
+ This function expects a callable that accepts a list of items being
+ returned from the query. The callable should return the member of the
+ list that comes first in merge order.
+
+ Note that this will only enforce a complete ordering of the results if
+ the query itself contains an ``order_by`` that will order the results
+ from each shard the same way as the ordering function provided to
+ `_merge_ordering`:
+
+ >>> q = query(Person).order_by([Person.age.asc()])
+ >>> mq = q._merge_ordering(lambda l: min(l, key=(lambda x: x.age)))
+ """
+
+ q = self._clone()
+ q._merge_ordering_fun = ordering
+ return q
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -244,6 +244,65 @@
# Begin yielding stuff off the next iterator.
except StopIteration:
pass
+
+def iter_merging(iters, ordering=min):
+ """Merge iterables using a generator and an ordering function.
+
+ Yields every item produced by every iterable in the given list, ordered as
+ selected by the merge ordering function. The function always emits every
+ item from every iterable once, but the order of the output will only be
+ correct if the iterables are ordered by the same ordering used by the
+ merge.
+
+ ordering
+ A function accepting a list. The function should return the member
+ of this list that comes next in the merge ordering. Defaults to
+ built-in function min.
+ """
+
+ def next_with_stopped_flag(iter):
+ """Get the next thing coming from the iterator.
+
+ Returns a tuple, containing either the item gotten with False (not
+ stopped), or None with True (iterator stopped).
+ """
+ try:
+ return iter.next(), False
+ except StopIteration:
+ return None, True
+
+ # I use iters_stopped as a list of flags indicating when each iterator
+ # ends, such that the domain of things that can be merge iter'ed does not
+ # exclude some sentinel value.
+
+ iter_heads = [] # Holds the last object generated by each iterator that
+ # hasn't been merged yet; a staging area.
+ iters_stopped = [] # Flags indicating whether each iterator has stopped.
+
+ # Prime the head list
+ for iterable in iters:
+ head, stopped = next_with_stopped_flag(iterable)
+ iter_heads.append(head)
+ iters_stopped.append(stopped)
+
+ # Run until all iterators stop
+ while not min(iters_stopped):
+
+ # Gather a list of candidate values
+ candidate_values = [val for val, stopped
+ in zip(iter_heads, iters_stopped) if not stopped]
+
+ # Pick the next thing in the merge order and yield it
+ yielded_value = ordering(candidate_values)
+ yield yielded_value
+
+ for i in range(len(iters)):
+
+ # Whichever item in the head list got yielded, replace it with the
+ # next thing from its iterator.
+ if yielded_value is iter_heads[i]:
+ iter_heads[i], iters_stopped[i] = next_with_stopped_flag(iters[i])
+ break
class ArgSingleton(type):
instances = weakref.WeakValueDictionary()
diff --git a/test/orm/sharding/shard.py b/test/orm/sharding/shard.py
--- a/test/orm/sharding/shard.py
+++ b/test/orm/sharding/shard.py
@@ -155,6 +155,17 @@
asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia']))
assert set([c.city for c in asia_and_europe]) == set(['Tokyo', 'London', 'Dublin'])
+ city_expected_order = sorted([o.city for o in [tokyo, newyork, toronto, london, dublin, brasilia, quito]])
+ # query_chooser throws an exception without a filter expression
+ base_query = sess.query(WeatherLocation).filter(WeatherLocation.continent != "Antarctica")
+
+ merge_ordered = base_query.order_by([WeatherLocation.city]
+ )._merge_ordering(lambda wxlocs: min(wxlocs, key=(lambda wxloc: wxloc.city)))
+ assert [c.city for c in merge_ordered] == city_expected_order
+
+ merge_ordered_rev = base_query.order_by([WeatherLocation.city.desc()]
+ )._merge_ordering(lambda wxlocs: max(wxlocs, key=(lambda wxloc: wxloc.city)))
+ assert [c.city for c in merge_ordered_rev] == list(reversed(city_expected_order))
if __name__ == '__main__':