Caleb Hattingh <[email protected]> added the comment:
I'm interested in how this change would affect the pattern of shutting down a
queue-processing task.
How would one decide between whether to cancel the queue or the task? (I'm
asking for real, this is not an objection to the PR). For example, looking at
the two tests in the PR:
def test_cancel_get(self):
queue = asyncio.Queue(loop=self.loop)
getter = self.loop.create_task(queue.get())
test_utils.run_briefly(self.loop)
queue.cancel() # <---- HERE
test_utils.run_briefly(self.loop)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(getter)
This test would work exactly the same if the `getter` task was cancelled
instead right? Like this:
def test_cancel_get(self):
queue = asyncio.Queue(loop=self.loop)
getter = self.loop.create_task(queue.get())
test_utils.run_briefly(self.loop)
getter.cancel() # <---- HERE
test_utils.run_briefly(self.loop)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(getter)
So my initial reaction is that I'm not sure under what conditions it would be
more useful to cancel the queue instead of the task. I am very used to applying
cancellation to tasks rather than the queues they contain, so I might lack
imagination in this area. The idiom I've been using so far for consuming queues
looks roughly something like this:
async def consumer(q: asyncio.Queue):
while True:
try:
data = await q.get()
except asyncio.CancelledError:
q.put_nowait(None) # ignore QueueFull for this discussion
continue
try:
if not data:
logging.info('Queue shut down cleanly')
return # <------ The only way to leave the coro
<process data>
except Exception:
logging.exception('Unexpected exception:')
continue
finally:
q.task_done()
^^ With this pattern, I can shut down the `consumer` task either by cancelling
the task (internally it'll put a `None` on the queue) or by placing a `None` on
the queue outright from anywhere else. The key point is that in either case,
existing items on the queue will still get processed before the `None` is
consumed, terminating the task from the inside.
(A) If the queue itself is cancelled (as in the proposed PR), would it still be
possible to catch the `CancelledError` and continue processing whatever items
have already been placed onto the queue? (and in this case, I think I'd still
need to place a sentinel onto the queue to mark the "end"...is that correct?)
(B) The `task_done()` is important for app shutdown so that the application
shutdown process waits for all currently-pending queue items to be processed
before proceeding with the next shutdown step. So, if the queue itself is
cancelled (as in the proposed PR), what happens to the application-level call
to `await queue.join()` during the shutdown sequence, if a queue was cancelled
while there were still unprocessed items on the queue for which `task_done()`
had not been called?
It would be great to have an example of how the proposed `queue.cancel()` would
be used idiomatically, w.r.t. the two questions above. It might be intended
that the idiomatic usage of `queue.cancel()` is for situations where one
doesn't care about dropping items previously placed on the queue. Is that the
case?
----------
nosy: +cjrh
_______________________________________
Python tracker <[email protected]>
<https://bugs.python.org/issue37334>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com