Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.

Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.
---
 pym/_emerge/PollScheduler.py              | 57 +++++++++++++++++++++++--------
 pym/_emerge/Scheduler.py                  |  7 ++--
 pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
 3 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac1..569879b 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
                        a non-main thread)
                @type main: bool
                """
+               self._term_rlock = threading.RLock()
                self._terminated = threading.Event()
                self._terminated_tasks = False
+               self._term_check_handle = None
                self._max_jobs = 1
                self._max_load = None
                self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
        def _is_background(self):
                return self._background
 
+       def _cleanup(self):
+               """
+               Cleanup any callbacks that have been registered with the global
+               event loop.
+               """
+               # The self._term_check_handle attribute requires locking
+               # since it's modified by the thread safe terminate method.
+               with self._term_rlock:
+                       if self._term_check_handle not in (None, False):
+                               self._term_check_handle.cancel()
+                       # This prevents the terminate method from scheduling
+                       # any more callbacks (since _cleanup must eliminate all
+                       # callbacks in order to ensure complete cleanup).
+                       self._term_check_handle = False
+
        def terminate(self):
                """
                Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
 
                This method is thread-safe (and safe for signal handlers).
                """
-               self._terminated.set()
+               with self._term_rlock:
+                       if self._term_check_handle is None:
+                               self._terminated.set()
+                               self._term_check_handle = 
self._event_loop.call_soon_threadsafe(
+                                       self._termination_check, True)
 
-       def _termination_check(self):
+       def _termination_check(self, retry=False):
                """
                Calls _terminate_tasks() if appropriate. It's guaranteed not to
-               call it while _schedule_tasks() is being called. The check 
should
-               be executed for each iteration of the event loop, for response 
to
-               termination signals at the earliest opportunity. It always 
returns
-               True, for continuous scheduling via idle_add.
+               call it while _schedule_tasks() is being called. This method 
must
+               only be called via the event loop thread.
+
+               @param retry: If True then reschedule if scheduling state 
prevents
+                       immediate termination.
+               @type retry: bool
                """
-               if not self._scheduling and \
-                       self._terminated.is_set() and \
+               if self._terminated.is_set() and \
                        not self._terminated_tasks:
-                       self._scheduling = True
-                       try:
-                               self._terminated_tasks = True
-                               self._terminate_tasks()
-                       finally:
-                               self._scheduling = False
-               return True
+                       if not self._scheduling:
+                               self._scheduling = True
+                               try:
+                                       self._terminated_tasks = True
+                                       self._terminate_tasks()
+                               finally:
+                                       self._scheduling = False
+
+                       elif retry:
+                               with self._term_rlock:
+                                       self._term_check_handle = 
self._event_loop.call_soon(
+                                               self._termination_check, True)
 
        def _terminate_tasks(self):
                """
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f..58ff971 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
                                else:
                                        signal.signal(signal.SIGCONT, 
signal.SIG_DFL)
 
+                       self._termination_check()
                        if received_signal:
                                sys.exit(received_signal[0])
 
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
                                if isinstance(x, Package) and x.operation == 
"merge"])
                        self._status_display.maxval = self._pkg_count.maxval
 
+               # Cleanup any callbacks that have been registered with the 
global
+               # event loop by calls to the terminate method.
+               self._cleanup()
+
                self._logger.log(" *** Finished. Cleaning up...")
 
                if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
                blocker_db.discardBlocker(pkg)
 
        def _main_loop(self):
-               term_check_id = 
self._event_loop.idle_add(self._termination_check)
                loadavg_check_id = None
                if self._max_load is not None and \
                        self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
                        while self._is_work_scheduled():
                                self._event_loop.iteration()
                finally:
-                       self._event_loop.source_remove(term_check_id)
                        if loadavg_check_id is not None:
                                self._event_loop.source_remove(loadavg_check_id)
 
diff --git a/pym/portage/util/_async/AsyncScheduler.py 
b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f..3deb6cb 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                self._error_count = 0
                self._running_tasks = set()
                self._remaining_tasks = True
-               self._term_check_id = None
                self._loadavg_check_id = None
 
        def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                self._schedule()
 
        def _start(self):
-               self._term_check_id = 
self._event_loop.idle_add(self._termination_check)
                if self._max_load is not None and \
                        self._loadavg_latency is not None and \
                        (self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                                self._loadavg_latency, self._schedule)
                self._schedule()
 
+       def _cleanup(self):
+               super(AsyncScheduler, self)._cleanup()
+               if self._loadavg_check_id is not None:
+                       self._event_loop.source_remove(self._loadavg_check_id)
+                       self._loadavg_check_id = None
+
        def _wait(self):
                # Loop while there are jobs to be scheduled.
                while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                while self._is_work_scheduled():
                        self._event_loop.iteration()
 
-               if self._term_check_id is not None:
-                       self._event_loop.source_remove(self._term_check_id)
-                       self._term_check_id = None
-
-               if self._loadavg_check_id is not None:
-                       self._event_loop.source_remove(self._loadavg_check_id)
-                       self._loadavg_check_id = None
+               self._cleanup()
 
                if self._error_count > 0:
                        self.returncode = 1
-- 
2.10.2


Reply via email to