Re: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)

2018-02-25 Thread Zac Medico
On 02/25/2018 07:17 PM, Alec Warner wrote:
> 
> 
> On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico  > wrote:
> 
> The iter_completed function is similar to asyncio.as_completed, but
> takes an iterator of futures as input, and includes support for
> max_jobs and max_load parameters. The default values for max_jobs
> and max_load correspond to multiprocessing.cpu_count().
> 
> Example usage for async_aux_get:
> 
>   import portage
>   from portage.util.futures.iter_completed import iter_completed
> 
>   portdb = portage.portdb
>   future_cpv = {}
> 
> 
> I'm not sure I grasp the purpose of this dict, can't we just modify the
> async aux get to return the cpv from the future?

If we do that then we should probably return all of the other aux_get
inputs too, including mylist, mytree, and myrepo. Pretty soon it feels
like there's a lot of clutter here. If we leave the burden to the
caller, then the API is simpler, and it's not much of a burden to the
caller anyway.

>   def future_generator():
>     for cpv in portdb.cp_list('sys-apps/portage'):
>       future = portdb.async_aux_get(cpv, portage.auxdbkeys)
>       future_cpv[id(future)] = cpv
>       yield future
> 
> 
> for cpv in portdb.cp_list('...'):
>    yield portdb.async_aux_get(cpv, portage.auxdbkeys)
>  
> 
>   for future in iter_completed(future_generator()):
>     cpv = future_cpv.pop(id(future))
>     try:
>       result = future.result()
>     except KeyError as e:
>       # aux_get failed
>       print('error:', cpv, e)
>     else:
>       print(cpv, result)
> 
> 
> for future in iter_completed(future_generator()):
>   try:
>     cpv, result = future.result() 
>   except KeyError as e:
>     print('error', cpv, e)
>  
> 
> Or do we expect callers to need other things to key off of in this API?

Yeah it's complicated because of the number of input arguments to
aux_get. You can have the same cpv existing in multiple repos. It's so
much simpler to let the caller manage the mapping from input arguments
to future instance.
-- 
Thanks,
Zac



signature.asc
Description: OpenPGP digital signature


Re: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)

2018-02-25 Thread Alec Warner
On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico  wrote:

> The iter_completed function is similar to asyncio.as_completed, but
> takes an iterator of futures as input, and includes support for
> max_jobs and max_load parameters. The default values for max_jobs
> and max_load correspond to multiprocessing.cpu_count().
>
> Example usage for async_aux_get:
>
>   import portage
>   from portage.util.futures.iter_completed import iter_completed
>
>   portdb = portage.portdb
>   future_cpv = {}
>

I'm not sure I grasp the purpose of this dict, can't we just modify the
async aux get to return the cpv from the future?


>
>   def future_generator():
> for cpv in portdb.cp_list('sys-apps/portage'):
>   future = portdb.async_aux_get(cpv, portage.auxdbkeys)
>   future_cpv[id(future)] = cpv
>   yield future
>
>
for cpv in portdb.cp_list('...'):
   yield portdb.async_aux_get(cpv, portage.auxdbkeys)


>   for future in iter_completed(future_generator()):
> cpv = future_cpv.pop(id(future))
> try:
>   result = future.result()
> except KeyError as e:
>   # aux_get failed
>   print('error:', cpv, e)
> else:
>   print(cpv, result)
>

for future in iter_completed(future_generator()):
  try:
cpv, result = future.result()
  except KeyError as e:
print('error', cpv, e)


Or do we expect callers to need other things to key off of in this API?

-A


> See: https://docs.python.org/3/library/asyncio-task.html#
> asyncio.as_completed
> Bug: https://bugs.gentoo.org/648790
> ---
>  .../tests/util/futures/test_iter_completed.py  | 50 
>  pym/portage/util/_async/FuturePollTask.py  | 27 ++
>  pym/portage/util/futures/iter_completed.py | 63 ++
>  pym/portage/util/futures/wait.py   | 95
> ++
>  4 files changed, 235 insertions(+)
>  create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
>  create mode 100644 pym/portage/util/_async/FuturePollTask.py
>  create mode 100644 pym/portage/util/futures/iter_completed.py
>  create mode 100644 pym/portage/util/futures/wait.py
>
> diff --git a/pym/portage/tests/util/futures/test_iter_completed.py
> b/pym/portage/tests/util/futures/test_iter_completed.py
> new file mode 100644
> index 0..6607d871c
> --- /dev/null
> +++ b/pym/portage/tests/util/futures/test_iter_completed.py
> @@ -0,0 +1,50 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import time
> +from portage.tests import TestCase
> +from portage.util._async.ForkProcess import ForkProcess
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.futures.iter_completed import iter_completed
> +
> +
> +class SleepProcess(ForkProcess):
> +   __slots__ = ('future', 'seconds')
> +   def _start(self):
> +   self.addExitListener(self._future_done)
> +   ForkProcess._start(self)
> +
> +   def _future_done(self, task):
> +   self.future.set_result(self.seconds)
> +
> +   def _run(self):
> +   time.sleep(self.seconds)
> +
> +
> +class IterCompletedTestCase(TestCase):
> +
> +   def testIterCompleted(self):
> +
> +   # Mark this as todo, since we don't want to fail if heavy
> system
> +   # load causes the tasks to finish in an unexpected order.
> +   self.todo = True
> +
> +   loop = global_event_loop()
> +   tasks = [
> +   SleepProcess(seconds=0.200),
> +   SleepProcess(seconds=0.100),
> +   SleepProcess(seconds=0.001),
> +   ]
> +
> +   expected_order = sorted(task.seconds for task in tasks)
> +
> +   def future_generator():
> +   for task in tasks:
> +   task.future = loop.create_future()
> +   task.scheduler = loop
> +   task.start()
> +   yield task.future
> +
> +   for seconds, future in zip(expected_order,
> iter_completed(future_generator(),
> +   max_jobs=None, max_load=None, loop=loop)):
> +   self.assertEqual(seconds, future.result())
> diff --git a/pym/portage/util/_async/FuturePollTask.py
> b/pym/portage/util/_async/FuturePollTask.py
> new file mode 100644
> index 0..6b7cdf7d5
> --- /dev/null
> +++ b/pym/portage/util/_async/FuturePollTask.py
> @@ -0,0 +1,27 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import os
> +import signal
> +
> +from _emerge.AbstractPollTask import AbstractPollTask
> +
> +
> +class FuturePollTask(AbstractPollTask):
> +   """
> +   Wraps a Future in an AsynchronousTask, which is useful for
> +   scheduling with TaskScheduler.
> +   """
> +