In order to avoid event loop recursion, pass fetchlist_dict to ManifestTask as a Future.
Bug: https://bugs.gentoo.org/653946 --- [PATCH v2] fixed _future_fetchlist to limit concurrent async_aux_get calls with async_iter_completed(max_jobs=1) .../ebuild/_parallel_manifest/ManifestScheduler.py | 108 ++++++++++++++++++--- .../ebuild/_parallel_manifest/ManifestTask.py | 22 +++++ pym/portage/tests/dbapi/test_portdb_cache.py | 1 + 3 files changed, 117 insertions(+), 14 deletions(-) diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py index 38ac4825e..2f453afc2 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py @@ -4,9 +4,9 @@ import portage from portage import os from portage.dep import _repo_separator -from portage.exception import InvalidDependString from portage.localization import _ from portage.util._async.AsyncScheduler import AsyncScheduler +from portage.util.futures.iter_completed import async_iter_completed from .ManifestTask import ManifestTask class ManifestScheduler(AsyncScheduler): @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) if not cpv_list: continue - fetchlist_dict = {} - try: - for cpv in cpv_list: - fetchlist_dict[cpv] = \ - list(portdb.getFetchMap(cpv, mytree=mytree)) - except InvalidDependString as e: - portage.writemsg( - _("!!! %s%s%s: SRC_URI: %s\n") % - (cp, _repo_separator, repo_config.name, e), - noiselevel=-1) - self._error_count += 1 - continue + # Use _future_fetchlist(max_jobs=1), since we + # spawn concurrent ManifestTask instances. yield ManifestTask(cp=cp, distdir=distdir, - fetchlist_dict=fetchlist_dict, repo_config=repo_config, + fetchlist_dict=_future_fetchlist( + self._event_loop, portdb, repo_config, cp, cpv_list, + max_jobs=1), + repo_config=repo_config, gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, force_sign_key=self._force_sign_key) @@ -91,3 +84,90 @@ class ManifestScheduler(AsyncScheduler): noiselevel=-1) AsyncScheduler._task_exit(self, task) + + +def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list, + max_jobs=None, max_load=None): + """ + Asynchronous form of FetchlistDict, with max_jobs and max_load + parameters in order to control async_aux_get concurrency. + + @param loop: event loop + @type loop: EventLoop + @param portdb: portdbapi instance + @type portdb: portdbapi + @param repo_config: repository configuration for a Manifest + @type repo_config: RepoConfig + @param cp: cp for a Manifest + @type cp: str + @param cpv_list: list of ebuild cpv values for a Manifest + @type cpv_list: list + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @return: a Future resulting in a Mapping compatible with FetchlistDict + @rtype: asyncio.Future (or compatible) + """ + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + futures = {} + + def future_generator(): + for cpv in cpv_list: + future = futures[cpv] = portdb.async_fetch_map( + cpv, mytree=repo_config.location, loop=loop) + yield future + + completed_iter = async_iter_completed( + future_generator(), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + def handle_result(future_done_set): + if result.cancelled(): + return + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + pass + else: + handle_result.current_task.add_done_callback(handle_result) + return + + e = None + for future in futures.values(): + if (future.done() and future.exception() is not None): + # Retrieve exceptions from all futures in order to + # avoid triggering the event loop's error handler. + e = future.exception() + + if e is None: + result.set_result(dict((k, list(v.result())) + for k, v in futures.items())) + else: + result.set_exception(e) + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + handle_result.current_task = None + result.set_result({}) + else: + handle_result.current_task.add_done_callback(handle_result) + + def cancel_callback(result): + if (result.cancelled() and + handle_result.current_task is not None and + not handle_result.current_task.done()): + handle_result.current_task.cancel() + + result.add_done_callback(cancel_callback) + + return result diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py index 0ee2b910d..6f5fe5b16 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py @@ -8,8 +8,12 @@ import subprocess from portage import os from portage import _unicode_encode, _encodings from portage.const import MANIFEST2_IDENTIFIERS +from portage.dep import _repo_separator +from portage.exception import InvalidDependString +from portage.localization import _ from portage.util import (atomic_ofstream, grablines, shlex_split, varexpand, writemsg) +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from _emerge.CompositeTask import CompositeTask @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): def _start(self): self._manifest_path = os.path.join(self.repo_config.location, self.cp, "Manifest") + + self._start_task( + AsyncTaskFuture(future=self.fetchlist_dict), + self._start_with_fetchlist) + + def _start_with_fetchlist(self, fetchlist_task): + if self._default_exit(fetchlist_task) != os.EX_OK: + if not self.fetchlist_dict.cancelled(): + try: + self.fetchlist_dict.result() + except InvalidDependString as e: + writemsg( + _("!!! %s%s%s: SRC_URI: %s\n") % + (self.cp, _repo_separator, self.repo_config.name, e), + noiselevel=-1) + self._async_wait() + return + self.fetchlist_dict = self.fetchlist_dict.result() manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, scheduler=self.scheduler) diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py index bd934460a..1f139b256 100644 --- a/pym/portage/tests/dbapi/test_portdb_cache.py +++ b/pym/portage/tests/dbapi/test_portdb_cache.py @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): portage_python = portage._python_interpreter egencache_cmd = (portage_python, "-b", "-Wd", os.path.join(self.bindir, "egencache"), + "--update-manifests", "--sign-manifests=n", "--repo", "test_repo", "--repositories-configuration", settings.repositories.config_string()) python_cmd = (portage_python, "-b", "-Wd", "-c") -- 2.13.6