I'm new to Python 3.5 async / await syntax. I would like to create a class - let's call it AsyncBus - that can be used to listen for keys, and send messages for keys ansynchronously.
class BusTimeoutError(Exception): pass class AsyncBus(object): def __init__(self, add_timeout): """add_timeout should be a function that uses the current event loop implementation to call back" .... async def notify(self, key, message): """Notify a single waiter about a key. Return if somebody was waiting for it.""" .... async def notifyall(self, key, message): """Notify all waiters. Return the number of waiters notified.""" .... async def waitfor(self, keys, timeout=None): """Wait until a message comes in for any of the given key(s). Raise BusTimeoutError after the given timedelta.""" .... Internally, the waitfor method would use the add_timeout to resume itself and raise the BusTimeoutError exception after the given timeout, and this should be the only place where a periodic (system time based) sleep would occur. The notify/notifyall methods would also resume execution in waitfor() call(s), but they would provide the message for the waiter instead of raising an exception. Here is an example use case: * Write a chat server, where all the users are running the chat client in a browser * The browser sends long polling ajax request to the server, that returns any new messages immediatelly, or block for at most timeout=10 seconds before returning without any message. This long poll would be called in an infinite loop in the browser. Internally, long poll requests would end in bus.waitfor() calls on the server. * When the user sends a new message to another user, then bus.notifyall() is awaited. notifyall() awakens all bus.waitfor() calls, delivers the message to all clients, and finally gives back the number of clients notified to the sender of the message. The sender can see how many clients got the message. I have examined code for long polling in other projects, and I have found that most of them use add_timeout to check for new messages periodically. I do not want to follow this practice. * let's say there are 1000 clients connected. * if I use a lower timeout (say 0.1 seconds) for periodic checks, then the server will be woken up 1000 times in ever 0.1 seconds. Avg. in every 0.0001 seconds. It will do nothing usefull in 99.99% of that time,. That seems to be bad. * if I use a higher timeout (say 10 seconds) then messages won't be delivered for an average of 5 seconds which is also bad. So messages should NOT be delivered by periodic checks. They should be delivered from events triggered by incoming messages. In other words: when a new message comes in, it should wake up the clients waiting for messages (for the given user) and deliver the message instantaneously. Question is: how to write the AsyncBus class? Here is what I have tried - but I don't know how to create the waiter object at the bottom. class BusTimeoutError(Exception): """Raised when the waiter has been waiting for too long.""" pass class AsnycBus(object): """Asynchronous notification bus.""" def __init__(self, add_timeout): self._waiters = {} self._add_timeout = add_timeout async def notify(self, key, message): """Notify a single waiter. Return if there was a waiter waiting for the key.""" if key in self._waiters: self._waiters[key][0].send((key, message)) return True else: return False async def notifyall(self, key, message): """Notify all waiters. Return the number of waiters notified.""" if key in self._waiters: # Get all waiters waiters = self._waiters[key] for waiter in waiters: # Send the message to the waiter waiter.send((key, message)) return len(waiters) else: return 0 async def waitfor(self, keys, timeout=None): """Wait for keys. :arg keys: An iterable of immutable keys. :arg timeout: Raise TimeoutError if nothing hits the bus for this amount of time. None means: wait indefinitely. It should be a datetime.timedelta object. """ # Register for keys if not keys: raise Exception("Need some keys to wait for...") waiter = ????????????????????????????? for key in keys: if key in self._waiters: self._waiters[key].append(waiter) else: self._waiters[key] = [waiter] try: # Add timeout and wake me up if nothing comes in. if timeout: self._add_timeout(timeout, functools.partial(waiter.throw, BusTimeoutError)) return await waiter finally: for key in keys: if key in self._waiters: self._waiters[key].remove(waiter)
-- https://mail.python.org/mailman/listinfo/python-list