Re: [gentoo-portage-dev] [PATCH] BinpkgFetcher: use async lock (bug 614110)

2018-04-22 Thread Zac Medico
On 04/21/2018 03:07 PM, Brian Dolbec wrote:
> On Sat, 21 Apr 2018 12:27:28 -0700
> Zac Medico  wrote:
> 
>> In order to avoid event loop recursion, convert the
>> _BinpkgFetcherProcess.lock() method to an async_lock
>> method for use by BinpkgFetcher.
>>
>> Bug: https://bugs.gentoo.org/614110
>> ---
>>  pym/_emerge/BinpkgFetcher.py | 53
>> +++- 1 file changed, 33
>> insertions(+), 20 deletions(-)
>>
>> diff --git a/pym/_emerge/BinpkgFetcher.py
>> b/pym/_emerge/BinpkgFetcher.py index 5ca7a45cf..2bbc0a26f 100644
>> --- a/pym/_emerge/BinpkgFetcher.py
>> +++ b/pym/_emerge/BinpkgFetcher.py
>> @@ -32,11 +32,24 @@ class BinpkgFetcher(CompositeTask):
>>  pkg.cpv) + ".partial"
>>  
>>  def _start(self):
>> -self._start_task(
>> -
>> _BinpkgFetcherProcess(background=self.background,
>> -logfile=self.logfile, pkg=self.pkg,
>> pkg_path=self.pkg_path,
>> -pretend=self.pretend,
>> scheduler=self.scheduler),
>> -self._fetcher_exit)
>> +fetcher =
>> _BinpkgFetcherProcess(background=self.background,
>> +logfile=self.logfile, pkg=self.pkg,
>> pkg_path=self.pkg_path,
>> +pretend=self.pretend,
>> scheduler=self.scheduler) +
>> +if not self.pretend:
>> +
>> portage.util.ensure_dirs(os.path.dirname(self.pkg_path))
>> +if "distlocks" in
>> self.pkg.root_config.settings.features:
>> +self._start_task(
>> +
>> AsyncTaskFuture(future=fetcher.async_lock()),
>> +
>> functools.partial(self._start_locked, fetcher))
>> +return
>> +
>> +self._start_task(fetcher, self._fetcher_exit)
>> +
>> +def _start_locked(self, fetcher, lock_task):
>> +self._assert_current(lock_task)
>> +lock_task.future.result()
>> +self._start_task(fetcher, self._fetcher_exit)
>>  
>>  def _fetcher_exit(self, fetcher):
>>  self._assert_current(fetcher)
>> @@ -68,13 +81,8 @@ class _BinpkgFetcherProcess(SpawnProcess):
>>  pretend = self.pretend
>>  bintree = pkg.root_config.trees["bintree"]
>>  settings = bintree.settings
>> -use_locks = "distlocks" in settings.features
>>  pkg_path = self.pkg_path
>>  
>> -if not pretend:
>> -
>> portage.util.ensure_dirs(os.path.dirname(pkg_path))
>> -if use_locks:
>> -self.lock()
>>  exists = os.path.exists(pkg_path)
>>  resume = exists and os.path.basename(pkg_path) in
>> bintree.invalids if not (pretend or resume):
>> @@ -184,7 +192,7 @@ class _BinpkgFetcherProcess(SpawnProcess):
>>  except
>> OSError: pass
>>  
>> -def lock(self):
>> +def async_lock(self):
>>  """
>>  This raises an AlreadyLocked exception if lock() is
>> called while a lock is already held. In order to avoid this, call
>> @@ -194,17 +202,22 @@ class _BinpkgFetcherProcess(SpawnProcess):
>>  if self._lock_obj is not None:
>>  raise self.AlreadyLocked((self._lock_obj,))
>>  
>> -async_lock = AsynchronousLock(path=self.pkg_path,
>> -scheduler=self.scheduler)
>> -async_lock.start()
>> +result = self.scheduler.create_future()
>>  
>> -if async_lock.wait() != os.EX_OK:
>> -# TODO: Use CompositeTask for better
>> handling, like in EbuildPhase.
>> -raise AssertionError("AsynchronousLock
>> failed with returncode %s" \
>> -% (async_lock.returncode,))
>> +def acquired_lock(async_lock):
>> +if async_lock.wait() == os.EX_OK:
>> +self.locked = True
>> +result.set_result(None)
>> +else:
>> +result.set_exception(AssertionError(
>> +"AsynchronousLock failed
>> with returncode %s"
>> +% (async_lock.returncode,)))
>>  
>> -self._lock_obj = async_lock
>> -self.locked = True
>> +self._lock_obj = AsynchronousLock(path=self.pkg_path,
>> +scheduler=self.scheduler)
>> +self._lock_obj.addExitListener(acquired_lock)
>> +self._lock_obj.start()
>> +return result
>>  
>>  class AlreadyLocked(portage.exception.PortageException):
>>  pass
> 
> 
> Looks fine to me :)
> 

Thanks, merged:

https://gitweb.gentoo.org/proj/portage.git/commit/?id=adf2e6dd57b5bcaa4a668e3085024ebc3224a2ca

-- 
Thanks,
Zac



signature.asc
Description: OpenPGP digital signature


[gentoo-portage-dev] [PATCH 3/5] EbuildFetcher: add _async_uri_map method (bug 653810)

2018-04-22 Thread Zac Medico
Add an _async_uri_map method to replace the synchronous _get_uri_map
method. This will be used to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
 pym/_emerge/EbuildFetcher.py | 27 ---
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 81eeb6dcd..1f574740b 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -49,7 +49,7 @@ class _EbuildFetcherProcess(ForkProcess):
"pkg", "prefetch", "_digests", "_manifest", "_settings", 
"_uri_map")
 
def already_fetched(self, settings):
-   uri_map = self._get_uri_map()
+   uri_map = 
self.scheduler.run_until_complete(self._async_uri_map())
if not uri_map:
return True
 
@@ -125,7 +125,7 @@ class _EbuildFetcherProcess(ForkProcess):
ebuild_path = self._get_ebuild_path()
 
try:
-   uri_map = self._get_uri_map()
+   uri_map = 
self.scheduler.run_until_complete(self._async_uri_map())
except portage.exception.InvalidDependString as e:
msg_lines = []
msg = "Fetch failed for '%s' due to invalid SRC_URI: 
%s" % \
@@ -210,21 +210,34 @@ class _EbuildFetcherProcess(ForkProcess):
self._digests = 
self._get_manifest().getTypeDigests("DIST")
return self._digests
 
-   def _get_uri_map(self):
+   def _async_uri_map(self):
"""
-   This can raise InvalidDependString from portdbapi.getFetchMap().
+   This calls the portdbapi.async_fetch_map method and returns the
+   resulting Future (may contain InvalidDependString exception).
"""
if self._uri_map is not None:
-   return self._uri_map
+   result = self.scheduler.create_future()
+   result.set_result(self._uri_map)
+   return result
+
pkgdir = os.path.dirname(self._get_ebuild_path())
mytree = os.path.dirname(os.path.dirname(pkgdir))
use = None
if not self.fetchall:
use = self.pkg.use.enabled
portdb = self.pkg.root_config.trees["porttree"].dbapi
-   self._uri_map = portdb.getFetchMap(self.pkg.cpv,
+
+   def cache_result(result):
+   try:
+   self._uri_map = result.result()
+   except Exception:
+   # The caller handles this when it retrieves the 
result.
+   pass
+
+   result = portdb.async_fetch_map(self.pkg.cpv,
useflags=use, mytree=mytree)
-   return self._uri_map
+   result.add_done_callback(cache_result)
+   return result
 
def _prefetch_size_ok(self, uri_map, settings, ebuild_path):
distdir = settings["DISTDIR"]
-- 
2.13.6




[gentoo-portage-dev] [PATCH 4/5] EbuildFetcher: use _async_uri_map in _start (bug 653810)

2018-04-22 Thread Zac Medico
Use _async_uri_map to avoid event loop recursion in _start.

Bug: https://bugs.gentoo.org/653810
---
 pym/_emerge/EbuildFetcher.py | 34 ++
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 1f574740b..8f6cc60fe 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -13,6 +13,7 @@ from portage import _unicode_decode
 from portage.checksum import _hash_filter
 from portage.elog.messages import eerror
 from portage.package.ebuild.fetch import _check_distfile, fetch
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.ForkProcess import ForkProcess
 from portage.util._pty import _create_pty_or_pipe
 from _emerge.CompositeTask import CompositeTask
@@ -40,6 +41,25 @@ class EbuildFetcher(CompositeTask):
return self._fetcher_proc.already_fetched(settings)
 
def _start(self):
+   self._start_task(
+   
AsyncTaskFuture(future=self._fetcher_proc._async_uri_map()),
+   self._start_fetch)
+
+   def _start_fetch(self, uri_map_task):
+   self._assert_current(uri_map_task)
+   try:
+   uri_map = uri_map_task.future.result()
+   except portage.exception.InvalidDependString as e:
+   msg_lines = []
+   msg = "Fetch failed for '%s' due to invalid SRC_URI: 
%s" % \
+   (self.pkg.cpv, e)
+   msg_lines.append(msg)
+   self._fetcher_proc._eerror(msg_lines)
+   self._current_task = None
+   self.returncode = 1
+   self._async_wait()
+   return
+
self._start_task(self._fetcher_proc, self._default_final_exit)
 
 
@@ -123,18 +143,8 @@ class _EbuildFetcherProcess(ForkProcess):
root_config = self.pkg.root_config
portdb = root_config.trees["porttree"].dbapi
ebuild_path = self._get_ebuild_path()
-
-   try:
-   uri_map = 
self.scheduler.run_until_complete(self._async_uri_map())
-   except portage.exception.InvalidDependString as e:
-   msg_lines = []
-   msg = "Fetch failed for '%s' due to invalid SRC_URI: 
%s" % \
-   (self.pkg.cpv, e)
-   msg_lines.append(msg)
-   self._eerror(msg_lines)
-   self._set_returncode((self.pid, 1 << 8))
-   self._async_wait()
-   return
+   # This is initialized by an earlier _async_uri_map call.
+   uri_map = self._uri_map
 
if not uri_map:
# Nothing to fetch.
-- 
2.13.6




[gentoo-portage-dev] [PATCH 2/5] EbuildFetcher: inherit CompositeTask (bug 653810)

2018-04-22 Thread Zac Medico
Make EbuildFetcher inherit CompositeTask, and move remaining code
to a _EbuildFetcherProcess class that still inherits ForkProcess.
The CompositeTask framework will be used to split EbuildFetcher
into subtasks, in order to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
 pym/_emerge/EbuildFetcher.py | 22 +++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index d98d00736..81eeb6dcd 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -15,12 +15,17 @@ from portage.elog.messages import eerror
 from portage.package.ebuild.fetch import _check_distfile, fetch
 from portage.util._async.ForkProcess import ForkProcess
 from portage.util._pty import _create_pty_or_pipe
+from _emerge.CompositeTask import CompositeTask
 
-class EbuildFetcher(ForkProcess):
+
+class EbuildFetcher(CompositeTask):
 
__slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
-   "pkg", "prefetch") + \
-   ("_digests", "_manifest", "_settings", "_uri_map")
+   "logfile", "pkg", "prefetch", "_fetcher_proc")
+
+   def __init__(self, **kwargs):
+   CompositeTask.__init__(self, **kwargs)
+   self._fetcher_proc = _EbuildFetcherProcess(**kwargs)
 
def already_fetched(self, settings):
"""
@@ -32,7 +37,18 @@ class EbuildFetcher(ForkProcess):
such messages. This will raise InvalidDependString if SRC_URI is
invalid.
"""
+   return self._fetcher_proc.already_fetched(settings)
+
+   def _start(self):
+   self._start_task(self._fetcher_proc, self._default_final_exit)
+
 
+class _EbuildFetcherProcess(ForkProcess):
+
+   __slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
+   "pkg", "prefetch", "_digests", "_manifest", "_settings", 
"_uri_map")
+
+   def already_fetched(self, settings):
uri_map = self._get_uri_map()
if not uri_map:
return True
-- 
2.13.6




[gentoo-portage-dev] [PATCH 1/5] portdbapi: add async_fetch_map method (bug 653810)

2018-04-22 Thread Zac Medico
Add a portdbapi.async_fetch_map method which is identical to the
existing portdbapi.getFetchMap method, but returns a future. This
will be used by EbuildFetcher in order to avoid event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
 pym/portage/dbapi/porttree.py | 75 +--
 1 file changed, 58 insertions(+), 17 deletions(-)

diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
index 951e5760a..3cd929963 100644
--- a/pym/portage/dbapi/porttree.py
+++ b/pym/portage/dbapi/porttree.py
@@ -728,25 +728,66 @@ class portdbapi(dbapi):
URIs.
@rtype: dict
"""
+   loop = self._event_loop
+   return loop.run_until_complete(
+   self.async_fetch_map(mypkg, useflags=useflags,
+   mytree=mytree, loop=loop))
 
-   try:
-   eapi, myuris = self.aux_get(mypkg,
-   ["EAPI", "SRC_URI"], mytree=mytree)
-   except KeyError:
-   # Convert this to an InvalidDependString exception 
since callers
-   # already handle it.
-   raise portage.exception.InvalidDependString(
-   "getFetchMap(): aux_get() error reading 
"+mypkg+"; aborting.")
+   def async_fetch_map(self, mypkg, useflags=None, mytree=None, loop=None):
+   """
+   Asynchronous form of getFetchMap.
 
-   if not eapi_is_supported(eapi):
-   # Convert this to an InvalidDependString exception
-   # since callers already handle it.
-   raise portage.exception.InvalidDependString(
-   "getFetchMap(): '%s' has unsupported EAPI: 
'%s'" % \
-   (mypkg, eapi))
-
-   return _parse_uri_map(mypkg, {'EAPI':eapi,'SRC_URI':myuris},
-   use=useflags)
+   @param mypkg: cpv for an ebuild
+   @type mypkg: String
+   @param useflags: a collection of enabled USE flags, for 
evaluation of
+   conditionals
+   @type useflags: set, or None to enable all conditionals
+   @param mytree: The canonical path of the tree in which the 
ebuild
+   is located, or None for automatic lookup
+   @type mypkg: String
+   @param loop: event loop (defaults to global event loop)
+   @type loop: EventLoop
+   @return: A future that results in a dict which maps each file 
name to
+   a set of alternative URIs.
+   @rtype: asyncio.Future (or compatible)
+   """
+   loop = loop or global_event_loop()
+   loop = getattr(loop, '_asyncio_wrapper', loop)
+   result = loop.create_future()
+
+   def aux_get_done(aux_get_future):
+   if result.cancelled():
+   return
+   if aux_get_future.exception() is not None:
+   if isinstance(future.exception(), 
PortageKeyError):
+   # Convert this to an 
InvalidDependString exception since
+   # callers already handle it.
+   
result.set_exception(portage.exception.InvalidDependString(
+   "getFetchMap(): aux_get() error 
reading "
+   + mypkg + "; aborting."))
+   else:
+   result.set_exception(future.exception())
+   return
+
+   eapi, myuris = aux_get_future.result()
+
+   if not eapi_is_supported(eapi):
+   # Convert this to an InvalidDependString 
exception
+   # since callers already handle it.
+   
result.set_exception(portage.exception.InvalidDependString(
+   "getFetchMap(): '%s' has unsupported 
EAPI: '%s'" % \
+   (mypkg, eapi)))
+   return
+
+   result.set_result(_parse_uri_map(mypkg,
+   {'EAPI':eapi,'SRC_URI':myuris}, use=useflags))
+
+   aux_get_future = self.async_aux_get(
+   mypkg, ["EAPI", "SRC_URI"], mytree=mytree)
+   result.add_done_callback(lambda result:
+   aux_get_future.cancel() if result.cancelled() else None)
+   aux_get_future.add_done_callback(aux_get_done)
+   return result
 
def getfetchsizes(self, mypkg, useflags=None, debug=0, myrepo=None):

[gentoo-portage-dev] [PATCH 5/5] EbuildFetcher: add async_already_fetched method (bug 653810)

2018-04-22 Thread Zac Medico
Add an async_already_fetched method to replace the synchronous
already_fetched method, and use it to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
 pym/_emerge/EbuildBuild.py   |  8 +++-
 pym/_emerge/EbuildFetcher.py | 30 --
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py
index 9d4afd0ea..21c7f81ce 100644
--- a/pym/_emerge/EbuildBuild.py
+++ b/pym/_emerge/EbuildBuild.py
@@ -207,8 +207,14 @@ class EbuildBuild(CompositeTask):
logfile=self.settings.get('PORTAGE_LOG_FILE'),
pkg=self.pkg, scheduler=self.scheduler)
 
+   self._start_task(AsyncTaskFuture(
+   future=fetcher.async_already_fetched(self.settings)),
+   functools.partial(self._start_fetch, fetcher))
+
+   def _start_fetch(self, fetcher, already_fetched_task):
+   self._assert_current(already_fetched_task)
try:
-   already_fetched = fetcher.already_fetched(self.settings)
+   already_fetched = already_fetched_task.future.result()
except portage.exception.InvalidDependString as e:
msg_lines = []
msg = "Fetch failed for '%s' due to invalid SRC_URI: 
%s" % \
diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 8f6cc60fe..589eda85d 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -28,7 +28,7 @@ class EbuildFetcher(CompositeTask):
CompositeTask.__init__(self, **kwargs)
self._fetcher_proc = _EbuildFetcherProcess(**kwargs)
 
-   def already_fetched(self, settings):
+   def async_already_fetched(self, settings):
"""
Returns True if all files already exist locally and have correct
digests, otherwise return False. When returning True, 
appropriate
@@ -38,7 +38,7 @@ class EbuildFetcher(CompositeTask):
such messages. This will raise InvalidDependString if SRC_URI is
invalid.
"""
-   return self._fetcher_proc.already_fetched(settings)
+   return self._fetcher_proc.async_already_fetched(settings)
 
def _start(self):
self._start_task(
@@ -68,11 +68,29 @@ class _EbuildFetcherProcess(ForkProcess):
__slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
"pkg", "prefetch", "_digests", "_manifest", "_settings", 
"_uri_map")
 
-   def already_fetched(self, settings):
-   uri_map = 
self.scheduler.run_until_complete(self._async_uri_map())
-   if not uri_map:
-   return True
+   def async_already_fetched(self, settings):
+   result = self.scheduler.create_future()
+
+   def uri_map_done(uri_map_future):
+   if uri_map_future.exception() is not None or 
result.cancelled():
+   if not result.cancelled():
+   
result.set_exception(uri_map_future.exception())
+   return
+
+   uri_map = uri_map_future.result()
+   if uri_map:
+   result.set_result(
+   self._check_already_fetched(settings, 
uri_map))
+   else:
+   result.set_result(True)
+
+   uri_map_future = self._async_uri_map()
+   result.add_done_callback(lambda result:
+   aux_get_future.cancel() if result.cancelled() else None)
+   uri_map_future.add_done_callback(uri_map_done)
+   return result
 
+   def _check_already_fetched(self, settings, uri_map):
digests = self._get_digests()
distdir = settings["DISTDIR"]
allow_missing = self._get_manifest().allow_missing
-- 
2.13.6




[gentoo-portage-dev] [PATCH 0/5] EbuildFetcher._get_uri_map(): fix event loop recursion (bug 653810)

2018-04-22 Thread Zac Medico
Bug: https://bugs.gentoo.org/653810

Zac Medico (5):
  portdbapi: add async_fetch_map method (bug 653810)
  EbuildFetcher: inherit CompositeTask (bug 653810)
  EbuildFetcher: add _async_uri_map method (bug 653810)
  EbuildFetcher: use _async_uri_map in _start (bug 653810)
  EbuildFetcher: add async_already_fetched method (bug 653810)

 pym/_emerge/EbuildBuild.py|   8 +++-
 pym/_emerge/EbuildFetcher.py  | 105 --
 pym/portage/dbapi/porttree.py |  75 +++---
 3 files changed, 146 insertions(+), 42 deletions(-)

-- 
2.13.6