commit:     04b1012594bfad1be719547e2c88a2dcf1051dc1
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 06:54:47 2017 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:11 2017 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=04b10125

EventLoop: implement call_soon for asyncio compat (bug 591760)

Since asyncio.AbstractEventLoop has no equivalent to the idle
callbacks implemented by the EventLoop.idle_add method, it is
necessary to implement the AbstractEventLoop.call_soon and
call_soon_threadsafe methods, so that idle_add usage can
eventually be eliminated.

X-Gentoo-bug: 591760
X-Gentoo-bug-url: https://bugs.gentoo.org/show_bug.cgi?id=591760
Acked-by: Brian Dolbec <dolsen <AT> gentoo.org>

 .../tests/util/eventloop/test_call_soon_fifo.py    | 30 ++++++++++
 pym/portage/util/_async/SchedulerInterface.py      |  5 +-
 pym/portage/util/_eventloop/EventLoop.py           | 67 +++++++++++++++++++++-
 3 files changed, 99 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/eventloop/test_call_soon_fifo.py 
b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py
new file mode 100644
index 000000000..5ecc13f43
--- /dev/null
+++ b/pym/portage/tests/util/eventloop/test_call_soon_fifo.py
@@ -0,0 +1,30 @@
+# Copyright 2017 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import functools
+import random
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.futures import Future
+
+class CallSoonFifoTestCase(TestCase):
+
+       def testCallSoonFifo(self):
+
+               inputs = [random.random() for index in range(10)]
+               outputs = []
+               finished = Future()
+
+               def add_output(value):
+                       outputs.append(value)
+                       if len(outputs) == len(inputs):
+                               finished.set_result(True)
+
+               event_loop = global_event_loop()
+               for value in inputs:
+                       event_loop.call_soon(functools.partial(add_output, 
value))
+
+               event_loop.run_until_complete(finished)
+               self.assertEqual(inputs, outputs)

diff --git a/pym/portage/util/_async/SchedulerInterface.py 
b/pym/portage/util/_async/SchedulerInterface.py
index 2ab668ee4..6028fd90d 100644
--- a/pym/portage/util/_async/SchedulerInterface.py
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -13,8 +13,9 @@ class SchedulerInterface(SlotObject):
 
        _event_loop_attrs = ("IO_ERR", "IO_HUP", "IO_IN",
                "IO_NVAL", "IO_OUT", "IO_PRI",
-               "child_watch_add", "idle_add", "io_add_watch",
-               "iteration", "source_remove", "timeout_add")
+               "call_soon", "call_soon_threadsafe", "child_watch_add",
+               "idle_add", "io_add_watch", "iteration", "run_until_complete",
+               "source_remove", "timeout_add")
 
        __slots__ = _event_loop_attrs + ("_event_loop", "_is_background")
 

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 8f13de377..308157bea 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -22,6 +22,7 @@ try:
 except ImportError:
        import dummy_threading as threading
 
+from portage import OrderedDict
 from portage.util import writemsg_level
 from ..SlotObject import SlotObject
 from .PollConstants import PollConstants
@@ -54,6 +55,38 @@ class EventLoop(object):
                __slots__ = ("args", "function", "calling", "interval", 
"source_id",
                        "timestamp")
 
+       class _handle(object):
+               """
+               A callback wrapper object, compatible with asyncio.Handle.
+               """
+               __slots__ = ("_callback_id", "_loop")
+
+               def __init__(self, callback_id, loop):
+                       self._callback_id = callback_id
+                       self._loop = loop
+
+               def cancel(self):
+                       """
+                       Cancel the call. If the callback is already canceled or 
executed,
+                       this method has no effect.
+                       """
+                       self._loop.source_remove(self._callback_id)
+
+       class _call_soon_callback(object):
+               """
+               Wraps a call_soon callback, and always returns False, since 
these
+               callbacks are only supposed to run once.
+               """
+               __slots__ = ("_args", "_callback")
+
+               def __init__(self, callback, args):
+                       self._callback = callback
+                       self._args = args
+
+               def __call__(self):
+                       self._callback(*self._args)
+                       return False
+
        def __init__(self, main=True):
                """
                @param main: If True then this is a singleton instance for use
@@ -70,7 +103,9 @@ class EventLoop(object):
                self._poll_event_handler_ids = {}
                # Increment id for each new handler.
                self._event_handler_id = 0
-               self._idle_callbacks = {}
+               # Use OrderedDict in order to emulate the FIFO queue behavior
+               # of the AbstractEventLoop.call_soon method.
+               self._idle_callbacks = OrderedDict()
                self._timeout_handlers = {}
                self._timeout_interval = None
 
@@ -399,6 +434,9 @@ class EventLoop(object):
                automatically removed from the list of event sources and will
                not be called again. This method is thread-safe.
 
+               The idle_add method is deprecated. Use the call_soon and
+               call_soon_threadsafe methods instead.
+
                @type callback: callable
                @param callback: a function to call
                @rtype: int
@@ -592,6 +630,33 @@ class EventLoop(object):
 
                return future.result()
 
+       def call_soon(self, callback, *args):
+               """
+               Arrange for a callback to be called as soon as possible. The 
callback
+               is called after call_soon() returns, when control returns to 
the event
+               loop.
+
+               This operates as a FIFO queue, callbacks are called in the 
order in
+               which they are registered. Each callback will be called exactly 
once.
+
+               Any positional arguments after the callback will be passed to 
the
+               callback when it is called.
+
+               An object compatible with asyncio.Handle is returned, which can
+               be used to cancel the callback.
+
+               @type callback: callable
+               @param callback: a function to call
+               @return: a handle which can be used to cancel the callback
+               @rtype: asyncio.Handle (or compatible)
+               """
+               return self._handle(self.idle_add(
+                       self._call_soon_callback(callback, args)), self)
+
+       # The call_soon method inherits thread safety from the idle_add method.
+       call_soon_threadsafe = call_soon
+
+
 _can_poll_device = None
 
 def can_poll_device():

Reply via email to