In order to avoid event loop recursion, pass fetchlist_dict to
ManifestTask as a Future.

Bug: https://bugs.gentoo.org/653946
---
[PATCH v2] fixed _future_fetchlist to limit concurrent async_aux_get
           calls with async_iter_completed(max_jobs=1)

 .../ebuild/_parallel_manifest/ManifestScheduler.py | 108 ++++++++++++++++++---
 .../ebuild/_parallel_manifest/ManifestTask.py      |  22 +++++
 pym/portage/tests/dbapi/test_portdb_cache.py       |   1 +
 3 files changed, 117 insertions(+), 14 deletions(-)

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py 
b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
index 38ac4825e..2f453afc2 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
@@ -4,9 +4,9 @@
 import portage
 from portage import os
 from portage.dep import _repo_separator
-from portage.exception import InvalidDependString
 from portage.localization import _
 from portage.util._async.AsyncScheduler import AsyncScheduler
+from portage.util.futures.iter_completed import async_iter_completed
 from .ManifestTask import ManifestTask
 
 class ManifestScheduler(AsyncScheduler):
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
                                cpv_list = portdb.cp_list(cp, 
mytree=[repo_config.location])
                                if not cpv_list:
                                        continue
-                               fetchlist_dict = {}
-                               try:
-                                       for cpv in cpv_list:
-                                               fetchlist_dict[cpv] = \
-                                                       
list(portdb.getFetchMap(cpv, mytree=mytree))
-                               except InvalidDependString as e:
-                                       portage.writemsg(
-                                               _("!!! %s%s%s: SRC_URI: %s\n") %
-                                               (cp, _repo_separator, 
repo_config.name, e),
-                                               noiselevel=-1)
-                                       self._error_count += 1
-                                       continue
 
+                               # Use _future_fetchlist(max_jobs=1), since we
+                               # spawn concurrent ManifestTask instances.
                                yield ManifestTask(cp=cp, distdir=distdir,
-                                       fetchlist_dict=fetchlist_dict, 
repo_config=repo_config,
+                                       fetchlist_dict=_future_fetchlist(
+                                               self._event_loop, portdb, 
repo_config, cp, cpv_list,
+                                               max_jobs=1),
+                                       repo_config=repo_config,
                                        gpg_cmd=self._gpg_cmd, 
gpg_vars=self._gpg_vars,
                                        force_sign_key=self._force_sign_key)
 
@@ -91,3 +84,90 @@ class ManifestScheduler(AsyncScheduler):
                                        noiselevel=-1)
 
                AsyncScheduler._task_exit(self, task)
+
+
+def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list,
+       max_jobs=None, max_load=None):
+       """
+       Asynchronous form of FetchlistDict, with max_jobs and max_load
+       parameters in order to control async_aux_get concurrency.
+
+       @param loop: event loop
+       @type loop: EventLoop
+       @param portdb: portdbapi instance
+       @type portdb: portdbapi
+       @param repo_config: repository configuration for a Manifest
+       @type repo_config: RepoConfig
+       @param cp: cp for a Manifest
+       @type cp: str
+       @param cpv_list: list of ebuild cpv values for a Manifest
+       @type cpv_list: list
+       @param max_jobs: max number of futures to process concurrently (default
+               is multiprocessing.cpu_count())
+       @type max_jobs: int
+       @param max_load: max load allowed when scheduling a new future,
+               otherwise schedule no more than 1 future at a time (default
+               is multiprocessing.cpu_count())
+       @type max_load: int or float
+       @return: a Future resulting in a Mapping compatible with FetchlistDict
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = getattr(loop, '_asyncio_wrapper', loop)
+       result = loop.create_future()
+       futures = {}
+
+       def future_generator():
+               for cpv in cpv_list:
+                       future = futures[cpv] = portdb.async_fetch_map(
+                               cpv, mytree=repo_config.location, loop=loop)
+                       yield future
+
+       completed_iter = async_iter_completed(
+               future_generator(),
+               max_jobs=max_jobs,
+               max_load=max_load,
+               loop=loop,
+       )
+
+       def handle_result(future_done_set):
+               if result.cancelled():
+                       return
+
+               try:
+                       handle_result.current_task = next(completed_iter)
+               except StopIteration:
+                       pass
+               else:
+                       
handle_result.current_task.add_done_callback(handle_result)
+                       return
+
+               e = None
+               for future in futures.values():
+                       if (future.done() and future.exception() is not None):
+                               # Retrieve exceptions from all futures in order 
to
+                               # avoid triggering the event loop's error 
handler.
+                               e = future.exception()
+
+               if e is None:
+                       result.set_result(dict((k, list(v.result()))
+                               for k, v in futures.items()))
+               else:
+                       result.set_exception(e)
+
+       try:
+               handle_result.current_task = next(completed_iter)
+       except StopIteration:
+               handle_result.current_task = None
+               result.set_result({})
+       else:
+               handle_result.current_task.add_done_callback(handle_result)
+
+       def cancel_callback(result):
+               if (result.cancelled() and
+                       handle_result.current_task is not None and
+                       not handle_result.current_task.done()):
+                       handle_result.current_task.cancel()
+
+       result.add_done_callback(cancel_callback)
+
+       return result
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py 
b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
index 0ee2b910d..6f5fe5b16 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
@@ -8,8 +8,12 @@ import subprocess
 from portage import os
 from portage import _unicode_encode, _encodings
 from portage.const import MANIFEST2_IDENTIFIERS
+from portage.dep import _repo_separator
+from portage.exception import InvalidDependString
+from portage.localization import _
 from portage.util import (atomic_ofstream, grablines,
        shlex_split, varexpand, writemsg)
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from _emerge.CompositeTask import CompositeTask
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
        def _start(self):
                self._manifest_path = os.path.join(self.repo_config.location,
                        self.cp, "Manifest")
+
+               self._start_task(
+                       AsyncTaskFuture(future=self.fetchlist_dict),
+                       self._start_with_fetchlist)
+
+       def _start_with_fetchlist(self, fetchlist_task):
+               if self._default_exit(fetchlist_task) != os.EX_OK:
+                       if not self.fetchlist_dict.cancelled():
+                               try:
+                                       self.fetchlist_dict.result()
+                               except InvalidDependString as e:
+                                       writemsg(
+                                               _("!!! %s%s%s: SRC_URI: %s\n") %
+                                               (self.cp, _repo_separator, 
self.repo_config.name, e),
+                                               noiselevel=-1)
+                       self._async_wait()
+                       return
+               self.fetchlist_dict = self.fetchlist_dict.result()
                manifest_proc = ManifestProcess(cp=self.cp, 
distdir=self.distdir,
                        fetchlist_dict=self.fetchlist_dict, 
repo_config=self.repo_config,
                        scheduler=self.scheduler)
diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py 
b/pym/portage/tests/dbapi/test_portdb_cache.py
index bd934460a..1f139b256 100644
--- a/pym/portage/tests/dbapi/test_portdb_cache.py
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
                portage_python = portage._python_interpreter
                egencache_cmd = (portage_python, "-b", "-Wd",
                        os.path.join(self.bindir, "egencache"),
+                       "--update-manifests", "--sign-manifests=n",
                        "--repo", "test_repo",
                        "--repositories-configuration", 
settings.repositories.config_string())
                python_cmd = (portage_python, "-b", "-Wd", "-c")
-- 
2.13.6


Reply via email to