(administrative stuff: my previous message seems to have gotten lost to the mailing list, thanks for fullquoting it).
On Tue, Apr 22, 2014 at 10:50:26AM -0700, Guido van Rossum wrote:
> I think it would be nice to iterate on this design a few more times
> until it is general and robust and clear enough to be added to the
> asyncio module as a standard helper abstraction for such cases.
i'll try to not get lost in other projects another time while this
unfolds :-)
> I'm still curious -- why are you rejecting using a Queue in the
> implementation? Your stated reason "if we wanted a Queue we'd have to
> add a limit" doesn't make sense to me.
it's meant the other way round: if there is a limit then we need a
queue, but more precisely, if we want to allow a limit, the callers must
be ready to wait for their set-result calls to return.
(and allowing a limit would be a good thing imo lest the queues explode)
what initially drove me away from your (is_value, value) queued tuples
towards dedicated Future objects is that i want to allow exceptions to
travel up through the consumer -- eg. if the `yield from
stream.readline()` raises something funny, i want that to raise from
where i'm iterating over the lines.
several more iterations led me back to your original proposal, now with
some safeguards and modified semantics.
(i'd prefer the semantics to be that the yield-from part can be called as
often as desired with no ill effects, and the assigning step to be the
driving one, borrowing from queue' or select/read's semantics.)
a new concept in the latest proposal is an easy way to decorate a
coroutine to be a generator. for example:
@cogenerator()
def count_slowly(asyncyield):
yield from asyncyield("good morning")
for i in range(3):
yield from sleep(1)
yield from asyncyield("counted %d"%i)
or, borrowing your suggested syntax to chain them more easily:
@cogenerator()
def filtered_results(self, asyncyield, search_term):
for item yield in self.all_results():
if item.is_hidden:
continue
if (yield from censorshipserver.is_banned(item)):
continue
if search_term not in item.title:
continue
yield from asyncyield(item)
note that due to the decorator-injected explicit asyncyield, the
cogenerator would be called as `self.filtered_results(search_term)`.
a current implementation called QueueWithEnd is attached; i've uploaded
the development history of proposals to [1] for easier review and
containing examples.
best regards
chrysn
[1]
https://gitorious.org/asyncio-for-loop-replacement/asyncio-for-loop-replacement
import abc
import enum
import asyncio
class AsyncIterable(metaclass=abc.ABCMeta):
@abc.abstractmethod
@asyncio.coroutine
def can_peek(self):
"""Return True when a result is ready to be fetched with .get_nowait(),
and False when no more items can be fetched."""
@abc.abstractmethod
@asyncio.coroutine
def get_nowait(self):
"""Fetch the next item. This must only be called once after can_peek
has returned True."""
class QueueWithEnd(AsyncIterable):
"""A QueueWithEnd shares a Queue's behavior in that it gets fed with put
and consumed with get_nowait. Contrary to a Queue, this is designed to be
consumed only by one entity, which uses the coroutine can_peek to make sure
the get_nowait will succeed.
Another difference between a Queue and a QueueWithEnd is that the latter
can also terminate (which is indicated by can_peek returning False and set
by the finish coroutine) and raise exceptions (which raise from the
get_nowait function and are set by the put_exception coroutine).
"""
Type = enum.Enum("QueueWithEnd.Type", "notpeeked value end exception")
def __init__(self, maxsize=0):
# (type, value)
self._queue = asyncio.Queue(maxsize)
self._ended = False
self._flag = self.Type.notpeeked
def __repr__(self):
return "<%s flag %s%s>" % (type(self).__name__, self._flag, " (%s)" %
self._value if self._flag in (self.Type.value,
self.Type.exception) else "")
# AsyncIterable interface
@asyncio.coroutine
def can_peek(self):
if self._flag is not self.Type.notpeeked:
return True
self._flag, self._value = yield from self._queue.get()
return self._flag is not self.Type.end
def get_nowait(self):
if self._flag in (self.Type.notpeeked, self.Type.end):
raise asyncio.QueueEmpty()
elif self._flag is self.Type.exception:
raise self._value
else:
self._flag = self.Type.notpeeked
return self._value
# feeder interface
@asyncio.coroutine
def put(self, value):
if self._ended:
raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
yield from self._queue.put((self.Type.value, value))
@asyncio.coroutine
def put_exception(self, value):
if self._ended:
raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
self._ended = True
yield from self._queue.put((self.Type.exception, value))
@asyncio.coroutine
def finish(self):
if self._ended:
raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
self._ended = True
yield from self._queue.put((self.Type.end, None))
# a simple way to create a feeder with something like an explicit yield
@classmethod
def cogenerator(cls, maxsize=0):
"""Coroutine decorator that passes a callable `asyncyield` into the function
as the first argument and returns a QueueWithEnd. It is implicitly
finished when the coroutine returns.
>>> @QueueWithEnd.cogenerator()
>>> def count_slowly(asyncyield, count_to=count_to):
... for i in range(count_to):
... yield from asyncio.sleep(1)
... yield from asyncyield(i + 1)
>>> counter = count_slowly(10)
>>> while (yield from counter.can_peek()):
... i = counter.get_nowait()
... print("Current count is %d"%i)
"""
def decorate(function):
cofun = asyncio.coroutine(function)
def wrapped(*args, **kwargs):
result = cls(maxsize=maxsize)
def guarding():
running = cofun(result.put, *args, **kwargs)
try:
yield from running
except Exception as e:
yield from result.put_exception(e)
else:
yield from result.finish()
asyncio.Task(guarding())
return result
return wrapped
return decorate
# implementing the Future interface -- note that it's neither a Future by
# inheritance, nor does it offer the complete Future interface; but it can
# be used in `for value in (yield from ...):`
def __iter__(self):
result = []
while (yield from self.can_peek()):
result.append(self.get_nowait())
return result
# compatibility to the original `Its` class
more = can_peek
value = property(get_nowait)
# another old name
consume = get_nowait
signature.asc
Description: Digital signature
