On Wed, Apr 14, 2021 at 01:50:37PM -0400, John Snow wrote: > On 4/14/21 1:44 AM, Stefan Hajnoczi wrote: > > On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote: > > > + async def _execute(self, msg: Message) -> object: > > > + """ > > > + The same as `execute_msg()`, but without safety mechanisms. > > > + > > > + Does not assign an execution ID and does not check that the form > > > + of the message being sent is valid. > > > + > > > + This method *Requires* an 'id' parameter to be set on the > > > + message, it will not set one for you like `execute()` or > > > + `execute_msg()`. > > > + > > > + Do not use "__aqmp#00000" style IDs, use something else to avoid > > > + potential clashes. If this ID clashes with an ID presently > > > + in-use or otherwise clashes with the auto-generated IDs, the > > > + response routing mechanisms in _on_message may very well fail > > > + loudly enough to cause the entire loop to crash. > > > + > > > + The ID should be a str; or at least something JSON > > > + serializable. It *must* be hashable. > > > + """ > > > + exec_id = cast(str, msg['id']) > > > + self.logger.debug("Execute(%s): '%s'", exec_id, > > > + msg.get('execute', msg.get('exec-oob'))) > > > + > > > + queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1) > > > + task = create_task(self._bh_execute(msg, queue)) > > > > We're already in a coroutine, can we await queue.get() ourselves instead > > of creating a new task? > > > > I guess this is done in order to use Task.cancel() in _bh_disconnect() > > but it seems simpler to use queue both for success and cancellation. > > Fewer tasks are easier to reason about. > > > > ...queues do not have a cancellation signal :( :( :( :( > > There's no way to "cancel" a queue: > https://docs.python.org/3/library/asyncio-queue.html#queue > > You *could* craft a special message and inject an exception into the queue > to notify the reader that the message will never arrive, but it feels like > working against the intended mechanism of that primitive. It really feels > like it wants to be wrapped in a *task*.
That's what I meant by "it seems simpler to use the queue both for success and cancellation". Just queue a message that says the execution has been cancelled. Stefan
signature.asc
Description: PGP signature