Re: [gentoo-portage-dev] [PATCH] BinpkgFetcher: use async lock (bug 614110)
On 04/21/2018 03:07 PM, Brian Dolbec wrote: > On Sat, 21 Apr 2018 12:27:28 -0700 > Zac Medicowrote: > >> In order to avoid event loop recursion, convert the >> _BinpkgFetcherProcess.lock() method to an async_lock >> method for use by BinpkgFetcher. >> >> Bug: https://bugs.gentoo.org/614110 >> --- >> pym/_emerge/BinpkgFetcher.py | 53 >> +++- 1 file changed, 33 >> insertions(+), 20 deletions(-) >> >> diff --git a/pym/_emerge/BinpkgFetcher.py >> b/pym/_emerge/BinpkgFetcher.py index 5ca7a45cf..2bbc0a26f 100644 >> --- a/pym/_emerge/BinpkgFetcher.py >> +++ b/pym/_emerge/BinpkgFetcher.py >> @@ -32,11 +32,24 @@ class BinpkgFetcher(CompositeTask): >> pkg.cpv) + ".partial" >> >> def _start(self): >> -self._start_task( >> - >> _BinpkgFetcherProcess(background=self.background, >> -logfile=self.logfile, pkg=self.pkg, >> pkg_path=self.pkg_path, >> -pretend=self.pretend, >> scheduler=self.scheduler), >> -self._fetcher_exit) >> +fetcher = >> _BinpkgFetcherProcess(background=self.background, >> +logfile=self.logfile, pkg=self.pkg, >> pkg_path=self.pkg_path, >> +pretend=self.pretend, >> scheduler=self.scheduler) + >> +if not self.pretend: >> + >> portage.util.ensure_dirs(os.path.dirname(self.pkg_path)) >> +if "distlocks" in >> self.pkg.root_config.settings.features: >> +self._start_task( >> + >> AsyncTaskFuture(future=fetcher.async_lock()), >> + >> functools.partial(self._start_locked, fetcher)) >> +return >> + >> +self._start_task(fetcher, self._fetcher_exit) >> + >> +def _start_locked(self, fetcher, lock_task): >> +self._assert_current(lock_task) >> +lock_task.future.result() >> +self._start_task(fetcher, self._fetcher_exit) >> >> def _fetcher_exit(self, fetcher): >> self._assert_current(fetcher) >> @@ -68,13 +81,8 @@ class _BinpkgFetcherProcess(SpawnProcess): >> pretend = self.pretend >> bintree = pkg.root_config.trees["bintree"] >> settings = bintree.settings >> -use_locks = "distlocks" in settings.features >> pkg_path = self.pkg_path >> >> -if not pretend: >> - >> portage.util.ensure_dirs(os.path.dirname(pkg_path)) >> -if use_locks: >> -self.lock() >> exists = os.path.exists(pkg_path) >> resume = exists and os.path.basename(pkg_path) in >> bintree.invalids if not (pretend or resume): >> @@ -184,7 +192,7 @@ class _BinpkgFetcherProcess(SpawnProcess): >> except >> OSError: pass >> >> -def lock(self): >> +def async_lock(self): >> """ >> This raises an AlreadyLocked exception if lock() is >> called while a lock is already held. In order to avoid this, call >> @@ -194,17 +202,22 @@ class _BinpkgFetcherProcess(SpawnProcess): >> if self._lock_obj is not None: >> raise self.AlreadyLocked((self._lock_obj,)) >> >> -async_lock = AsynchronousLock(path=self.pkg_path, >> -scheduler=self.scheduler) >> -async_lock.start() >> +result = self.scheduler.create_future() >> >> -if async_lock.wait() != os.EX_OK: >> -# TODO: Use CompositeTask for better >> handling, like in EbuildPhase. >> -raise AssertionError("AsynchronousLock >> failed with returncode %s" \ >> -% (async_lock.returncode,)) >> +def acquired_lock(async_lock): >> +if async_lock.wait() == os.EX_OK: >> +self.locked = True >> +result.set_result(None) >> +else: >> +result.set_exception(AssertionError( >> +"AsynchronousLock failed >> with returncode %s" >> +% (async_lock.returncode,))) >> >> -self._lock_obj = async_lock >> -self.locked = True >> +self._lock_obj = AsynchronousLock(path=self.pkg_path, >> +scheduler=self.scheduler) >> +self._lock_obj.addExitListener(acquired_lock) >> +self._lock_obj.start() >> +return result >> >> class AlreadyLocked(portage.exception.PortageException): >> pass > > > Looks fine to me :) > Thanks, merged: https://gitweb.gentoo.org/proj/portage.git/commit/?id=adf2e6dd57b5bcaa4a668e3085024ebc3224a2ca -- Thanks, Zac signature.asc Description: OpenPGP digital signature
[gentoo-portage-dev] [PATCH 3/5] EbuildFetcher: add _async_uri_map method (bug 653810)
Add an _async_uri_map method to replace the synchronous _get_uri_map method. This will be used to prevent event loop recursion. Bug: https://bugs.gentoo.org/653810 --- pym/_emerge/EbuildFetcher.py | 27 --- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py index 81eeb6dcd..1f574740b 100644 --- a/pym/_emerge/EbuildFetcher.py +++ b/pym/_emerge/EbuildFetcher.py @@ -49,7 +49,7 @@ class _EbuildFetcherProcess(ForkProcess): "pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map") def already_fetched(self, settings): - uri_map = self._get_uri_map() + uri_map = self.scheduler.run_until_complete(self._async_uri_map()) if not uri_map: return True @@ -125,7 +125,7 @@ class _EbuildFetcherProcess(ForkProcess): ebuild_path = self._get_ebuild_path() try: - uri_map = self._get_uri_map() + uri_map = self.scheduler.run_until_complete(self._async_uri_map()) except portage.exception.InvalidDependString as e: msg_lines = [] msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \ @@ -210,21 +210,34 @@ class _EbuildFetcherProcess(ForkProcess): self._digests = self._get_manifest().getTypeDigests("DIST") return self._digests - def _get_uri_map(self): + def _async_uri_map(self): """ - This can raise InvalidDependString from portdbapi.getFetchMap(). + This calls the portdbapi.async_fetch_map method and returns the + resulting Future (may contain InvalidDependString exception). """ if self._uri_map is not None: - return self._uri_map + result = self.scheduler.create_future() + result.set_result(self._uri_map) + return result + pkgdir = os.path.dirname(self._get_ebuild_path()) mytree = os.path.dirname(os.path.dirname(pkgdir)) use = None if not self.fetchall: use = self.pkg.use.enabled portdb = self.pkg.root_config.trees["porttree"].dbapi - self._uri_map = portdb.getFetchMap(self.pkg.cpv, + + def cache_result(result): + try: + self._uri_map = result.result() + except Exception: + # The caller handles this when it retrieves the result. + pass + + result = portdb.async_fetch_map(self.pkg.cpv, useflags=use, mytree=mytree) - return self._uri_map + result.add_done_callback(cache_result) + return result def _prefetch_size_ok(self, uri_map, settings, ebuild_path): distdir = settings["DISTDIR"] -- 2.13.6
[gentoo-portage-dev] [PATCH 4/5] EbuildFetcher: use _async_uri_map in _start (bug 653810)
Use _async_uri_map to avoid event loop recursion in _start. Bug: https://bugs.gentoo.org/653810 --- pym/_emerge/EbuildFetcher.py | 34 ++ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py index 1f574740b..8f6cc60fe 100644 --- a/pym/_emerge/EbuildFetcher.py +++ b/pym/_emerge/EbuildFetcher.py @@ -13,6 +13,7 @@ from portage import _unicode_decode from portage.checksum import _hash_filter from portage.elog.messages import eerror from portage.package.ebuild.fetch import _check_distfile, fetch +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.ForkProcess import ForkProcess from portage.util._pty import _create_pty_or_pipe from _emerge.CompositeTask import CompositeTask @@ -40,6 +41,25 @@ class EbuildFetcher(CompositeTask): return self._fetcher_proc.already_fetched(settings) def _start(self): + self._start_task( + AsyncTaskFuture(future=self._fetcher_proc._async_uri_map()), + self._start_fetch) + + def _start_fetch(self, uri_map_task): + self._assert_current(uri_map_task) + try: + uri_map = uri_map_task.future.result() + except portage.exception.InvalidDependString as e: + msg_lines = [] + msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \ + (self.pkg.cpv, e) + msg_lines.append(msg) + self._fetcher_proc._eerror(msg_lines) + self._current_task = None + self.returncode = 1 + self._async_wait() + return + self._start_task(self._fetcher_proc, self._default_final_exit) @@ -123,18 +143,8 @@ class _EbuildFetcherProcess(ForkProcess): root_config = self.pkg.root_config portdb = root_config.trees["porttree"].dbapi ebuild_path = self._get_ebuild_path() - - try: - uri_map = self.scheduler.run_until_complete(self._async_uri_map()) - except portage.exception.InvalidDependString as e: - msg_lines = [] - msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \ - (self.pkg.cpv, e) - msg_lines.append(msg) - self._eerror(msg_lines) - self._set_returncode((self.pid, 1 << 8)) - self._async_wait() - return + # This is initialized by an earlier _async_uri_map call. + uri_map = self._uri_map if not uri_map: # Nothing to fetch. -- 2.13.6
[gentoo-portage-dev] [PATCH 2/5] EbuildFetcher: inherit CompositeTask (bug 653810)
Make EbuildFetcher inherit CompositeTask, and move remaining code to a _EbuildFetcherProcess class that still inherits ForkProcess. The CompositeTask framework will be used to split EbuildFetcher into subtasks, in order to prevent event loop recursion. Bug: https://bugs.gentoo.org/653810 --- pym/_emerge/EbuildFetcher.py | 22 +++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py index d98d00736..81eeb6dcd 100644 --- a/pym/_emerge/EbuildFetcher.py +++ b/pym/_emerge/EbuildFetcher.py @@ -15,12 +15,17 @@ from portage.elog.messages import eerror from portage.package.ebuild.fetch import _check_distfile, fetch from portage.util._async.ForkProcess import ForkProcess from portage.util._pty import _create_pty_or_pipe +from _emerge.CompositeTask import CompositeTask -class EbuildFetcher(ForkProcess): + +class EbuildFetcher(CompositeTask): __slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall", - "pkg", "prefetch") + \ - ("_digests", "_manifest", "_settings", "_uri_map") + "logfile", "pkg", "prefetch", "_fetcher_proc") + + def __init__(self, **kwargs): + CompositeTask.__init__(self, **kwargs) + self._fetcher_proc = _EbuildFetcherProcess(**kwargs) def already_fetched(self, settings): """ @@ -32,7 +37,18 @@ class EbuildFetcher(ForkProcess): such messages. This will raise InvalidDependString if SRC_URI is invalid. """ + return self._fetcher_proc.already_fetched(settings) + + def _start(self): + self._start_task(self._fetcher_proc, self._default_final_exit) + +class _EbuildFetcherProcess(ForkProcess): + + __slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall", + "pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map") + + def already_fetched(self, settings): uri_map = self._get_uri_map() if not uri_map: return True -- 2.13.6
[gentoo-portage-dev] [PATCH 1/5] portdbapi: add async_fetch_map method (bug 653810)
Add a portdbapi.async_fetch_map method which is identical to the existing portdbapi.getFetchMap method, but returns a future. This will be used by EbuildFetcher in order to avoid event loop recursion. Bug: https://bugs.gentoo.org/653810 --- pym/portage/dbapi/porttree.py | 75 +-- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index 951e5760a..3cd929963 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -728,25 +728,66 @@ class portdbapi(dbapi): URIs. @rtype: dict """ + loop = self._event_loop + return loop.run_until_complete( + self.async_fetch_map(mypkg, useflags=useflags, + mytree=mytree, loop=loop)) - try: - eapi, myuris = self.aux_get(mypkg, - ["EAPI", "SRC_URI"], mytree=mytree) - except KeyError: - # Convert this to an InvalidDependString exception since callers - # already handle it. - raise portage.exception.InvalidDependString( - "getFetchMap(): aux_get() error reading "+mypkg+"; aborting.") + def async_fetch_map(self, mypkg, useflags=None, mytree=None, loop=None): + """ + Asynchronous form of getFetchMap. - if not eapi_is_supported(eapi): - # Convert this to an InvalidDependString exception - # since callers already handle it. - raise portage.exception.InvalidDependString( - "getFetchMap(): '%s' has unsupported EAPI: '%s'" % \ - (mypkg, eapi)) - - return _parse_uri_map(mypkg, {'EAPI':eapi,'SRC_URI':myuris}, - use=useflags) + @param mypkg: cpv for an ebuild + @type mypkg: String + @param useflags: a collection of enabled USE flags, for evaluation of + conditionals + @type useflags: set, or None to enable all conditionals + @param mytree: The canonical path of the tree in which the ebuild + is located, or None for automatic lookup + @type mypkg: String + @param loop: event loop (defaults to global event loop) + @type loop: EventLoop + @return: A future that results in a dict which maps each file name to + a set of alternative URIs. + @rtype: asyncio.Future (or compatible) + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + + def aux_get_done(aux_get_future): + if result.cancelled(): + return + if aux_get_future.exception() is not None: + if isinstance(future.exception(), PortageKeyError): + # Convert this to an InvalidDependString exception since + # callers already handle it. + result.set_exception(portage.exception.InvalidDependString( + "getFetchMap(): aux_get() error reading " + + mypkg + "; aborting.")) + else: + result.set_exception(future.exception()) + return + + eapi, myuris = aux_get_future.result() + + if not eapi_is_supported(eapi): + # Convert this to an InvalidDependString exception + # since callers already handle it. + result.set_exception(portage.exception.InvalidDependString( + "getFetchMap(): '%s' has unsupported EAPI: '%s'" % \ + (mypkg, eapi))) + return + + result.set_result(_parse_uri_map(mypkg, + {'EAPI':eapi,'SRC_URI':myuris}, use=useflags)) + + aux_get_future = self.async_aux_get( + mypkg, ["EAPI", "SRC_URI"], mytree=mytree) + result.add_done_callback(lambda result: + aux_get_future.cancel() if result.cancelled() else None) + aux_get_future.add_done_callback(aux_get_done) + return result def getfetchsizes(self, mypkg, useflags=None, debug=0, myrepo=None):
[gentoo-portage-dev] [PATCH 5/5] EbuildFetcher: add async_already_fetched method (bug 653810)
Add an async_already_fetched method to replace the synchronous already_fetched method, and use it to prevent event loop recursion. Bug: https://bugs.gentoo.org/653810 --- pym/_emerge/EbuildBuild.py | 8 +++- pym/_emerge/EbuildFetcher.py | 30 -- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py index 9d4afd0ea..21c7f81ce 100644 --- a/pym/_emerge/EbuildBuild.py +++ b/pym/_emerge/EbuildBuild.py @@ -207,8 +207,14 @@ class EbuildBuild(CompositeTask): logfile=self.settings.get('PORTAGE_LOG_FILE'), pkg=self.pkg, scheduler=self.scheduler) + self._start_task(AsyncTaskFuture( + future=fetcher.async_already_fetched(self.settings)), + functools.partial(self._start_fetch, fetcher)) + + def _start_fetch(self, fetcher, already_fetched_task): + self._assert_current(already_fetched_task) try: - already_fetched = fetcher.already_fetched(self.settings) + already_fetched = already_fetched_task.future.result() except portage.exception.InvalidDependString as e: msg_lines = [] msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \ diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py index 8f6cc60fe..589eda85d 100644 --- a/pym/_emerge/EbuildFetcher.py +++ b/pym/_emerge/EbuildFetcher.py @@ -28,7 +28,7 @@ class EbuildFetcher(CompositeTask): CompositeTask.__init__(self, **kwargs) self._fetcher_proc = _EbuildFetcherProcess(**kwargs) - def already_fetched(self, settings): + def async_already_fetched(self, settings): """ Returns True if all files already exist locally and have correct digests, otherwise return False. When returning True, appropriate @@ -38,7 +38,7 @@ class EbuildFetcher(CompositeTask): such messages. This will raise InvalidDependString if SRC_URI is invalid. """ - return self._fetcher_proc.already_fetched(settings) + return self._fetcher_proc.async_already_fetched(settings) def _start(self): self._start_task( @@ -68,11 +68,29 @@ class _EbuildFetcherProcess(ForkProcess): __slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall", "pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map") - def already_fetched(self, settings): - uri_map = self.scheduler.run_until_complete(self._async_uri_map()) - if not uri_map: - return True + def async_already_fetched(self, settings): + result = self.scheduler.create_future() + + def uri_map_done(uri_map_future): + if uri_map_future.exception() is not None or result.cancelled(): + if not result.cancelled(): + result.set_exception(uri_map_future.exception()) + return + + uri_map = uri_map_future.result() + if uri_map: + result.set_result( + self._check_already_fetched(settings, uri_map)) + else: + result.set_result(True) + + uri_map_future = self._async_uri_map() + result.add_done_callback(lambda result: + aux_get_future.cancel() if result.cancelled() else None) + uri_map_future.add_done_callback(uri_map_done) + return result + def _check_already_fetched(self, settings, uri_map): digests = self._get_digests() distdir = settings["DISTDIR"] allow_missing = self._get_manifest().allow_missing -- 2.13.6
[gentoo-portage-dev] [PATCH 0/5] EbuildFetcher._get_uri_map(): fix event loop recursion (bug 653810)
Bug: https://bugs.gentoo.org/653810 Zac Medico (5): portdbapi: add async_fetch_map method (bug 653810) EbuildFetcher: inherit CompositeTask (bug 653810) EbuildFetcher: add _async_uri_map method (bug 653810) EbuildFetcher: use _async_uri_map in _start (bug 653810) EbuildFetcher: add async_already_fetched method (bug 653810) pym/_emerge/EbuildBuild.py| 8 +++- pym/_emerge/EbuildFetcher.py | 105 -- pym/portage/dbapi/porttree.py | 75 +++--- 3 files changed, 146 insertions(+), 42 deletions(-) -- 2.13.6