>> async def waitfor(self, keys, timeout=None): >> """Wait until a message comes in for any of the given key(s). Raise >> BusTimeoutError after the given timedelta.""" >> .... >> >> >> Internally, the waitfor method would use the add_timeout to resume itself >> and raise the BusTimeoutError exception after the given timeout, and this >> should be the only place where a periodic (system time based) sleep would >> occur. The notify/notifyall methods would also resume execution in waitfor() >> call(s), but they would provide the message for the waiter instead of >> raising an exception. > My first instinct is to suggest that you not reinvent the wheel and > point you at the asyncio.Condition class. However, it apparently > doesn't support setting a timeout on wait, which seems odd since the > threading.Condition class that it's based on does. You could use > asyncio.wait to wait for it with a timeout, but that wouldn't remove > the waiter from the Condition. Maybe this would be a useful feature > request + patch. I can submit a feature request, but I cannot patch the CPython implementation. :-)
In order to schedule a callback in the future, you would have to have a standard event loop interface for scheduling. We do have a base class asyncio.BaseEventLoop, but tornado's ioloop is NOT a descendant of it. (It is based on tornado.util.Configureable, which is based on "object"). BaseEventLoop.call_later and tornado.ioloop.IOLoop.add_timeout are similar, but also they are quite different. I'm confused. How they are working together? In order to have a timeout on the asyncio.Condition.wait, the Condition object would have to know how to wait for the given amount of time. But asyncio.call_after and tornado ioloop.add_timeout are fundamentally different. Aren't they? If I use multiple io loops (which is uncommon but possible), then how would the Condition object know what loop should be used for the callback? >> async def notify(self, key, message): >> """Notify a single waiter. Return if there was a waiter waiting for >> the key.""" >> if key in self._waiters: >> self._waiters[key][0].send((key, message)) >> return True >> else: >> return False > It looks like you're assuming that the waiter object will be a > coroutine and trying to call its send method directly rather than > going through the event loop? Well, not exactly. When a message comes in, an external event is generated (in this case: starts with a network I/O interrupt that continues as an event inside tornado's ioloop). So the call chain DOES start in the event loop, and it will eventually calls the send() method of the waiter directly. The call will continue to run in code that writes back response to the client waiting for it and finish that request. Yes, send() is called explicitely. But calling generator.send(None) is equivalent of next(generator), and we do that all the time. Why is that a bad idea? I do not see anywhere in the documentation that send() should not be called. Maybe I'm wrong. > That seems like a bad idea. In asyncio, > a coroutine is something that you can await or schedule with > loop.create_task(). Don't try to use those low-level methods. But here, you refer to BaseEventLoop.create_task(). I cannot use it with tornado. :-( In Tornado, most tasks are not created explicitely. They are started when an external interrupt (http request) comes in. There is also support for "add_timeout" but I guess it does not use asyncio.call_later. Frankly I don't know what is the difference in their implementation. The thing I want to achieve is "suspend me, give back execution to MY CURRENT event loop, and resume me when somebody wants to resume me explicitely (from the same event loop)". This is a basic feature, and I thought it could be implemented in a way that is independent of the event loop implementation. (Just like async, await, yield, send and throw are independent - they can be used with twisted, tornado or BaseEventLoop etc.) > I think a better approach would be to make the waiter a Future and > signal it by setting its result. Something like this, as a rough > sketch: > > async def waitfor(self, keys, timeout=None): > waiter = asyncio.Future() > for key in keys: > self._waiters[key].add(waiter) > handle = None > if timeout: > handle = asyncio.call_later(timeout, self._handle_timeout, waiter) Will this work if I use tornado's ioloop? I'm sorry but I do not see what is the difference between tornado ioloop.add_timeout and asyncio.call_later. Are they fundamentally different? Or can I just use asyncio.call_later and expect that it will work with any event loop implementation? > try: > return await waiter > finally: > # TODO: Use a context manager to add and remove the keys. > for key in keys: > self._waiters[key].discard(waiter) > if handle: > handle.cancel() > > def notify(self, key, message): > if key in self._waiters and self._waiters[key]: > waiter = next(iter(self._waiters[key])) > waiter.set_result((key, message)) I think this is what I needed. I'm going to try this tomorrow. -- https://mail.python.org/mailman/listinfo/python-list