commit: 04b1012594bfad1be719547e2c88a2dcf1051dc1 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Mar 21 06:54:47 2017 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Mar 24 20:32:11 2017 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=04b10125
EventLoop: implement call_soon for asyncio compat (bug 591760) Since asyncio.AbstractEventLoop has no equivalent to the idle callbacks implemented by the EventLoop.idle_add method, it is necessary to implement the AbstractEventLoop.call_soon and call_soon_threadsafe methods, so that idle_add usage can eventually be eliminated. X-Gentoo-bug: 591760 X-Gentoo-bug-url: https://bugs.gentoo.org/show_bug.cgi?id=591760 Acked-by: Brian Dolbec <dolsen <AT> gentoo.org> .../tests/util/eventloop/test_call_soon_fifo.py | 30 ++++++++++ pym/portage/util/_async/SchedulerInterface.py | 5 +- pym/portage/util/_eventloop/EventLoop.py | 67 +++++++++++++++++++++- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py new file mode 100644 index 000000000..5ecc13f43 --- /dev/null +++ b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py @@ -0,0 +1,30 @@ +# Copyright 2017 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import functools +import random + +from portage import os +from portage.tests import TestCase +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures.futures import Future + +class CallSoonFifoTestCase(TestCase): + + def testCallSoonFifo(self): + + inputs = [random.random() for index in range(10)] + outputs = [] + finished = Future() + + def add_output(value): + outputs.append(value) + if len(outputs) == len(inputs): + finished.set_result(True) + + event_loop = global_event_loop() + for value in inputs: + event_loop.call_soon(functools.partial(add_output, value)) + + event_loop.run_until_complete(finished) + self.assertEqual(inputs, outputs) diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py index 2ab668ee4..6028fd90d 100644 --- a/pym/portage/util/_async/SchedulerInterface.py +++ b/pym/portage/util/_async/SchedulerInterface.py @@ -13,8 +13,9 @@ class SchedulerInterface(SlotObject): _event_loop_attrs = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI", - "child_watch_add", "idle_add", "io_add_watch", - "iteration", "source_remove", "timeout_add") + "call_soon", "call_soon_threadsafe", "child_watch_add", + "idle_add", "io_add_watch", "iteration", "run_until_complete", + "source_remove", "timeout_add") __slots__ = _event_loop_attrs + ("_event_loop", "_is_background") diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 8f13de377..308157bea 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -22,6 +22,7 @@ try: except ImportError: import dummy_threading as threading +from portage import OrderedDict from portage.util import writemsg_level from ..SlotObject import SlotObject from .PollConstants import PollConstants @@ -54,6 +55,38 @@ class EventLoop(object): __slots__ = ("args", "function", "calling", "interval", "source_id", "timestamp") + class _handle(object): + """ + A callback wrapper object, compatible with asyncio.Handle. + """ + __slots__ = ("_callback_id", "_loop") + + def __init__(self, callback_id, loop): + self._callback_id = callback_id + self._loop = loop + + def cancel(self): + """ + Cancel the call. If the callback is already canceled or executed, + this method has no effect. + """ + self._loop.source_remove(self._callback_id) + + class _call_soon_callback(object): + """ + Wraps a call_soon callback, and always returns False, since these + callbacks are only supposed to run once. + """ + __slots__ = ("_args", "_callback") + + def __init__(self, callback, args): + self._callback = callback + self._args = args + + def __call__(self): + self._callback(*self._args) + return False + def __init__(self, main=True): """ @param main: If True then this is a singleton instance for use @@ -70,7 +103,9 @@ class EventLoop(object): self._poll_event_handler_ids = {} # Increment id for each new handler. self._event_handler_id = 0 - self._idle_callbacks = {} + # Use OrderedDict in order to emulate the FIFO queue behavior + # of the AbstractEventLoop.call_soon method. + self._idle_callbacks = OrderedDict() self._timeout_handlers = {} self._timeout_interval = None @@ -399,6 +434,9 @@ class EventLoop(object): automatically removed from the list of event sources and will not be called again. This method is thread-safe. + The idle_add method is deprecated. Use the call_soon and + call_soon_threadsafe methods instead. + @type callback: callable @param callback: a function to call @rtype: int @@ -592,6 +630,33 @@ class EventLoop(object): return future.result() + def call_soon(self, callback, *args): + """ + Arrange for a callback to be called as soon as possible. The callback + is called after call_soon() returns, when control returns to the event + loop. + + This operates as a FIFO queue, callbacks are called in the order in + which they are registered. Each callback will be called exactly once. + + Any positional arguments after the callback will be passed to the + callback when it is called. + + An object compatible with asyncio.Handle is returned, which can + be used to cancel the callback. + + @type callback: callable + @param callback: a function to call + @return: a handle which can be used to cancel the callback + @rtype: asyncio.Handle (or compatible) + """ + return self._handle(self.idle_add( + self._call_soon_callback(callback, args)), self) + + # The call_soon method inherits thread safety from the idle_add method. + call_soon_threadsafe = call_soon + + _can_poll_device = None def can_poll_device():