It seems inevitable that if you use await twice you risk being cancelled in between. The solution is to only use a single await to do all the work, like asyncio.queue does (and why not use that for your use case?). I don't think inventing a parallel API of synchronous callbacks is a good idea -- as you say there's a good reason why callbacks are asynchronous, and having two subtly different ways of doing things seems more confusing than helpful. Async I/O is already complicated -- let's not make it more so.
On Tue, Jul 30, 2019 at 2:28 AM <aurelien.lambert...@gmail.com> wrote: > In asyncio, when a task awaits for another task (or future), it can be > cancelled right after the awaited task finished (before the callback have > been processed). Thus, if the awaited task has consumed data, the data is > lost. > > For instance, with the following code: > > import asyncio > > available_data = [] > data_ready = asyncio.Future() > > def feed_data(data): > global data_ready > available_data.append(data) > data_ready.set_result(None) > data_ready = asyncio.Future() > > async def consume_data(): > while not available_data: > await asyncio.shield(data_ready) > return available_data.pop() > > async def wrapped_consumer(): > task = asyncio.ensure_future(consume_data()) > return await task > > If I perform those exact steps: > > async def test(): > task = asyncio.ensure_future(wrapped_consumer()) > await asyncio.sleep(0) > feed_data('data') > await asyncio.sleep(0) > task.cancel() > await asyncio.sleep(0) > print ('task', task) > print ('available_data', available_data) > > loop = asyncio.get_event_loop() > loop.run_until_complete(test()) > > Then I can see that the task has been cancelled despite the data being > consumed. Since the result of `wrapped_consumer` cannot be retrieved, the > data is forever lost. > > task <Task cancelled coro=<wrapped_consumer() done, defined at > <ipython-input-1-de4ad193b1d0>:17>> > available_data [] > > This side effect does not happen when awaiting a coroutine, but coroutine > are not as flexible as tasks. It happens when awaiting a `Future`, a > `Task`, or any function like `asyncio.wait`, `asyncio.wait_for` or > `asyncio.gather` (which all inherit from or use `Future`). There is then no > way to do anything equivalent to: > > stop_future = asyncio.Future() > async def wrapped_consumer2(): > task = asyncio.ensure_future(consume_data()) > try: > await asyncio.wait([task, stop_future]) > finally: > task.cancel() > await asyncio.wait([task]) > if not task.cancelled(): > return task.result() > else: > raise RuntimeError('stopped') > > This is due to the Future calling the callback asynchronously: > https://github.com/python/cpython/blob/3.6/Lib/asyncio/futures.py#L214 > > for callback in callbacks: > self._loop.call_soon(callback, self) > > I propose to create synchronous versions of those, or a > `synchronous_callback` parameter, that turns the callbacks of `Future` > synchronous. I've experimented a simple librairy `syncio` with CPython 3.6 > to do this (it is harder to patch later versions due to the massive use of > private methods). > Basically, needs to: > 1) replace the `Future._schedule_callbacks` method by a synchronous version > 2) fix `Task._step` to not fail when cleaning `_current_tasks` ( > https://github.com/python/cpython/blob/3.6/Lib/asyncio/tasks.py#L245) > 3) rewrite all the functions to use synchronous futures instead of normal > ones > > With that librairy, the previous functions are possible and intuitive > > import syncio > > async def wrapped_consumer(): > task = syncio.ensure_sync_future(consume_data()) > return await task > > stop_future = asyncio.Future() > async def wrapped_consumer2(): > task = syncio.ensure_sync_future(consume_data()) > try: > await syncio.sync_wait([task, stop_future]) > finally: > task.cancel() > await asyncio.wait([task]) > if not task.cancelled(): > return task.result() > else: > raise RuntimeError('stopped') > > No need to use `syncio` anywhere else in the code, which makes it totally > transparent for the end user. > > This "library" can be found here: > https://github.com/aure-olli/aiokafka/blob/216b4ad3b4115bc9fa463e44fe23636bd63c5da4/syncio.py > It implements `SyncFuture`, `SyncTask`, `ensure_sync_future`, `sync_wait`, > `sync_wait_for`, `sync_gather` and `sync_shield`. It works with CPython 3.6 > only. > > To conclude: > - asynchronous callbacks are preferrable in most case, but do not provide > a coherent cancelled status in specific cases > - implementing a version with synchronous callback (or a > `synchronous_callback` parameter) is rather easy (however step 2 need to be > clarified, probably a cleaner way to fix this) > - it is totally transparent for the end user, as synchronous callback are > totally compatible with asynchronous ones > _______________________________________________ > Python-ideas mailing list -- python-ideas@python.org > To unsubscribe send an email to python-ideas-le...@python.org > https://mail.python.org/mailman3/lists/python-ideas.python.org/ > Message archived at > https://mail.python.org/archives/list/python-ideas@python.org/message/E77XF3K4SRYFG3F6QLFBPP5YOB726DNS/ > Code of Conduct: http://python.org/psf/codeofconduct/ > -- --Guido van Rossum (python.org/~guido) *Pronouns: he/him/his **(why is my pronoun here?)* <http://feministing.com/2015/02/03/how-using-they-as-a-singular-pronoun-can-change-the-world/>
_______________________________________________ Python-ideas mailing list -- python-ideas@python.org To unsubscribe send an email to python-ideas-le...@python.org https://mail.python.org/mailman3/lists/python-ideas.python.org/ Message archived at https://mail.python.org/archives/list/python-ideas@python.org/message/BD6QEHNIWBVGZ26FRQKDBXLGYRHKPJNW/ Code of Conduct: http://python.org/psf/codeofconduct/