On Thu, Oct 15, 2015 at 5:25 AM, Nagy László Zsolt <gand...@shopzeus.com> wrote: > I'm new to Python 3.5 async / await syntax. I would like to create a class - > let's call it AsyncBus - that can be used to listen for keys, and send > messages for keys ansynchronously. > > class BusTimeoutError(Exception): > pass > > class AsyncBus(object): > def __init__(self, add_timeout): > """add_timeout should be a function that uses the current event loop > implementation to call back" > .... > > async def notify(self, key, message): > """Notify a single waiter about a key. Return if somebody was > waiting for it.""" > .... > > async def notifyall(self, key, message): > """Notify all waiters. Return the number of waiters notified.""" > .... > > async def waitfor(self, keys, timeout=None): > """Wait until a message comes in for any of the given key(s). Raise > BusTimeoutError after the given timedelta.""" > .... > > > Internally, the waitfor method would use the add_timeout to resume itself > and raise the BusTimeoutError exception after the given timeout, and this > should be the only place where a periodic (system time based) sleep would > occur. The notify/notifyall methods would also resume execution in waitfor() > call(s), but they would provide the message for the waiter instead of > raising an exception.
My first instinct is to suggest that you not reinvent the wheel and point you at the asyncio.Condition class. However, it apparently doesn't support setting a timeout on wait, which seems odd since the threading.Condition class that it's based on does. You could use asyncio.wait to wait for it with a timeout, but that wouldn't remove the waiter from the Condition. Maybe this would be a useful feature request + patch. > Question is: how to write the AsyncBus class? Here is what I have tried - > but I don't know how to create the waiter object at the bottom. > > > class BusTimeoutError(Exception): > """Raised when the waiter has been waiting for too long.""" > pass > > > class AsnycBus(object): > """Asynchronous notification bus.""" > > def __init__(self, add_timeout): > self._waiters = {} > self._add_timeout = add_timeout > > async def notify(self, key, message): > """Notify a single waiter. Return if there was a waiter waiting for > the key.""" > if key in self._waiters: > self._waiters[key][0].send((key, message)) > return True > else: > return False It looks like you're assuming that the waiter object will be a coroutine and trying to call its send method directly rather than going through the event loop? That seems like a bad idea. In asyncio, a coroutine is something that you can await or schedule with loop.create_task(). Don't try to use those low-level methods. I think a better approach would be to make the waiter a Future and signal it by setting its result. Something like this, as a rough sketch: async def waitfor(self, keys, timeout=None): waiter = asyncio.Future() for key in keys: self._waiters[key].add(waiter) handle = None if timeout: handle = asyncio.call_later(timeout, self._handle_timeout, waiter) try: return await waiter finally: # TODO: Use a context manager to add and remove the keys. for key in keys: self._waiters[key].discard(waiter) if handle: handle.cancel() def notify(self, key, message): if key in self._waiters and self._waiters[key]: waiter = next(iter(self._waiters[key])) waiter.set_result((key, message)) return True return False def _handle_timeout(self, waiter): waiter.set_exception(new BusTimeoutError()) -- https://mail.python.org/mailman/listinfo/python-list