In asyncio, when a task awaits for another task (or future), it can be 
cancelled right after the awaited task finished (before the callback have been 
processed). Thus, if the awaited task has consumed data, the data is lost.

For instance, with the following code:

    import asyncio

    available_data = []
    data_ready = asyncio.Future()

    def feed_data(data):
        global data_ready
        available_data.append(data)
        data_ready.set_result(None)
        data_ready = asyncio.Future()

    async def consume_data():
        while not available_data:
            await asyncio.shield(data_ready)
        return available_data.pop()

    async def wrapped_consumer():
        task = asyncio.ensure_future(consume_data())
        return await task

If I perform those exact steps:

    async def test():
        task = asyncio.ensure_future(wrapped_consumer())
        await asyncio.sleep(0)
        feed_data('data')
        await asyncio.sleep(0)
        task.cancel()
        await asyncio.sleep(0)
        print ('task', task)
        print ('available_data', available_data)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test())

Then I can see that the task has been cancelled despite the data being 
consumed. Since the result of `wrapped_consumer` cannot be retrieved, the data 
is forever lost.

    task <Task cancelled coro=<wrapped_consumer() done, defined at 
<ipython-input-1-de4ad193b1d0>:17>>
    available_data []

This side effect does not happen when awaiting a coroutine, but coroutine are 
not as flexible as tasks. It happens when awaiting a `Future`, a `Task`, or any 
function like `asyncio.wait`, `asyncio.wait_for` or `asyncio.gather` (which all 
inherit from or use `Future`). There is then no way to do anything equivalent 
to:

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = asyncio.ensure_future(consume_data())
        try:
            await asyncio.wait([task, stop_future])
        finally:
            task.cancel()
            await asyncio.wait([task])
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

This is due to the Future calling the callback asynchronously:
https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214

        for callback in callbacks:
            self._loop.call_soon(callback, self)

I propose to create synchronous versions of those, or a `synchronous_callback` 
parameter, that turns the callbacks of `Future` synchronous. I've experimented 
a simple librairy `syncio` with CPython 3.6 to do this (it is harder to patch 
later versions due to the massive use of private methods).
Basically, needs to:
1) replace the `Future._schedule_callbacks` method by a synchronous version
2) fix `Task._step` to not fail when cleaning `_current_tasks` 
(https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245)
3) rewrite all the functions to use synchronous futures instead of normal ones

With that librairy, the previous functions are possible and intuitive

    import syncio
    
    async def wrapped_consumer():
        task = syncio.ensure_sync_future(consume_data())
        return await task

    stop_future = asyncio.Future()
    async def wrapped_consumer2():
        task = syncio.ensure_sync_future(consume_data())
        try:
            await syncio.sync_wait([task, stop_future])
        finally:
            task.cancel()
            await asyncio.wait([task])
        if not task.cancelled():
            return task.result()
        else:
            raise RuntimeError('stopped')

No need to use `syncio` anywhere else in the code, which makes it totally 
transparent for the end user.

This "library" can be found here: 
https://github.com/aure-olli/aiokafka/blob/216b4ad3b4115bc9fa463e44fe23636bd63c5da4/syncio.py
It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, 
`sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6 
only.

To conclude:
- asynchronous callbacks are preferrable in most case, but do not provide a 
coherent cancelled status in specific cases
- implementing a version with synchronous callback (or a `synchronous_callback` 
parameter) is rather easy (however step 2 need to be clarified, probably a 
cleaner way to fix this)
- it is totally transparent for the end user, as synchronous callback are 
totally compatible with asynchronous ones
_______________________________________________
Python-ideas mailing list -- python-ideas@python.org
To unsubscribe send an email to python-ideas-le...@python.org
https://mail.python.org/mailman3/lists/python-ideas.python.org/
Message archived at 
https://mail.python.org/archives/list/python-ideas@python.org/message/E77XF3K4SRYFG3F6QLFBPP5YOB726DNS/
Code of Conduct: http://python.org/psf/codeofconduct/

Reply via email to