(administrative stuff: my previous message seems to have gotten lost to
the mailing list, thanks for fullquoting it. this message was not shown
on the list either, trying to send it directly once more, sorry for the
noise.)

On Tue, Apr 22, 2014 at 10:50:26AM -0700, Guido van Rossum wrote:
> I think it would be nice to iterate on this design a few more times
> until it is general and robust and clear enough to be added to the
> asyncio module as a standard helper abstraction for such cases.

i'll try to not get lost in other projects another time while this
unfolds :-)

> I'm still curious -- why are you rejecting using a Queue in the
> implementation? Your stated reason "if we wanted a Queue we'd have to
> add a limit" doesn't make sense to me.

it's meant the other way round: if there is a limit then we need a
queue, but more precisely, if we want to allow a limit, the callers must
be ready to wait for their set-result calls to return.

(and allowing a limit would be a good thing imo lest the queues explode)

what initially drove me away from your (is_value, value) queued tuples
towards dedicated Future objects is that i want to allow exceptions to
travel up through the consumer -- eg. if the `yield from
stream.readline()` raises something funny, i want that to raise from
where i'm iterating over the lines.

several more iterations led me back to your original proposal, now with
some safeguards and modified semantics.

(i'd prefer the semantics to be that the yield-from part can be called as
often as desired with no ill effects, and the assigning step to be the
driving one, borrowing from queue' or select/read's semantics.)


a new concept in the latest proposal is an easy way to decorate a
coroutine to be a generator. for example:

    @cogenerator()
    def count_slowly(asyncyield):
        yield from asyncyield("good morning")
        for i in range(3):
            yield from sleep(1)
            yield from asyncyield("counted %d"%i)

or, borrowing your suggested syntax to chain them more easily:

    @cogenerator()
    def filtered_results(self, asyncyield, search_term):
        for item yield in self.all_results():
            if item.is_hidden:
                continue
            if (yield from censorshipserver.is_banned(item)):
                continue
            if search_term not in item.title:
                continue
            yield from asyncyield(item)

note that due to the decorator-injected explicit asyncyield, the
cogenerator would be called as `self.filtered_results(search_term)`.

a current implementation called QueueWithEnd is attached; i've uploaded
the development history of proposals to [1] for easier review and
containing examples.

best regards
chrysn

[1] 
https://gitorious.org/asyncio-for-loop-replacement/asyncio-for-loop-replacement
import abc
import enum
import asyncio

class AsyncIterable(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    @asyncio.coroutine
    def can_peek(self):
        """Return True when a result is ready to be fetched with .get_nowait(),
        and False when no more items can be fetched."""

    @abc.abstractmethod
    @asyncio.coroutine
    def get_nowait(self):
        """Fetch the next item. This must only be called once after can_peek
        has returned True."""

class QueueWithEnd(AsyncIterable):
    """A QueueWithEnd shares a Queue's behavior in that it gets fed with put
    and consumed with get_nowait. Contrary to a Queue, this is designed to be
    consumed only by one entity, which uses the coroutine can_peek to make sure
    the get_nowait will succeed.

    Another difference between a Queue and a QueueWithEnd is that the latter
    can also terminate (which is indicated by can_peek returning False and set
    by the finish coroutine) and raise exceptions (which raise from the
    get_nowait function and are set by the put_exception coroutine).
    """
    Type = enum.Enum("QueueWithEnd.Type", "notpeeked value end exception")

    def __init__(self, maxsize=0):
        # (type, value)
        self._queue = asyncio.Queue(maxsize)
        self._ended = False
        self._flag = self.Type.notpeeked

    def __repr__(self):
        return "<%s flag %s%s>" % (type(self).__name__, self._flag, " (%s)" %
                self._value if self._flag in (self.Type.value,
                    self.Type.exception) else "")

    # AsyncIterable interface

    @asyncio.coroutine
    def can_peek(self):
        if self._flag is not self.Type.notpeeked:
            return True
        self._flag, self._value = yield from self._queue.get()
        return self._flag is not self.Type.end

    def get_nowait(self):
        if self._flag in (self.Type.notpeeked, self.Type.end):
            raise asyncio.QueueEmpty()
        elif self._flag is self.Type.exception:
            raise self._value
        else:
            self._flag = self.Type.notpeeked
            return self._value

    # feeder interface

    @asyncio.coroutine
    def put(self, value):
        if self._ended:
            raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
        yield from self._queue.put((self.Type.value, value))

    @asyncio.coroutine
    def put_exception(self, value):
        if self._ended:
            raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
        self._ended = True
        yield from self._queue.put((self.Type.exception, value))

    @asyncio.coroutine
    def finish(self):
        if self._ended:
            raise asyncio.InvalidStateError("%s has already ended"%type(self).__name__)
        self._ended = True
        yield from self._queue.put((self.Type.end, None))

    # a simple way to create a feeder with something like an explicit yield

    @classmethod
    def cogenerator(cls, maxsize=0):
        """Coroutine decorator that passes a callable `asyncyield` into the function
        as the first argument and returns a QueueWithEnd. It is implicitly
        finished when the coroutine returns.

        >>> @QueueWithEnd.cogenerator()
        >>> def count_slowly(asyncyield, count_to=count_to):
        ...     for i in range(count_to):
        ...         yield from asyncio.sleep(1)
        ...         yield from asyncyield(i + 1)
        >>> counter = count_slowly(10)
        >>> while (yield from counter.can_peek()):
        ...     i = counter.get_nowait()
        ...     print("Current count is %d"%i)
        """

        def decorate(function):
            cofun = asyncio.coroutine(function)
            def wrapped(*args, **kwargs):
                result = cls(maxsize=maxsize)
                def guarding():
                    running = cofun(result.put, *args, **kwargs)
                    try:
                        yield from running
                    except Exception as e:
                        yield from result.put_exception(e)
                    else:
                        yield from result.finish()
                asyncio.Task(guarding())
                return result
            return wrapped
        return decorate

    # implementing the Future interface -- note that it's neither a Future by
    # inheritance, nor does it offer the complete Future interface; but it can
    # be used in `for value in (yield from ...):`

    def __iter__(self):
        result = []
        while (yield from self.can_peek()):
            result.append(self.get_nowait())
        return result

    # compatibility to the original `Its` class

    more = can_peek
    value = property(get_nowait)
    # another old name
    consume = get_nowait

Attachment: signature.asc
Description: Digital signature

Reply via email to