Re: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)

2018-02-25 Thread Zac Medico
On 02/25/2018 07:17 PM, Alec Warner wrote:
> 
> 
> On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico  > wrote:
> 
> The iter_completed function is similar to asyncio.as_completed, but
> takes an iterator of futures as input, and includes support for
> max_jobs and max_load parameters. The default values for max_jobs
> and max_load correspond to multiprocessing.cpu_count().
> 
> Example usage for async_aux_get:
> 
>   import portage
>   from portage.util.futures.iter_completed import iter_completed
> 
>   portdb = portage.portdb
>   future_cpv = {}
> 
> 
> I'm not sure I grasp the purpose of this dict, can't we just modify the
> async aux get to return the cpv from the future?

If we do that then we should probably return all of the other aux_get
inputs too, including mylist, mytree, and myrepo. Pretty soon it feels
like there's a lot of clutter here. If we leave the burden to the
caller, then the API is simpler, and it's not much of a burden to the
caller anyway.

>   def future_generator():
>     for cpv in portdb.cp_list('sys-apps/portage'):
>       future = portdb.async_aux_get(cpv, portage.auxdbkeys)
>       future_cpv[id(future)] = cpv
>       yield future
> 
> 
> for cpv in portdb.cp_list('...'):
>    yield portdb.async_aux_get(cpv, portage.auxdbkeys)
>  
> 
>   for future in iter_completed(future_generator()):
>     cpv = future_cpv.pop(id(future))
>     try:
>       result = future.result()
>     except KeyError as e:
>       # aux_get failed
>       print('error:', cpv, e)
>     else:
>       print(cpv, result)
> 
> 
> for future in iter_completed(future_generator()):
>   try:
>     cpv, result = future.result() 
>   except KeyError as e:
>     print('error', cpv, e)
>  
> 
> Or do we expect callers to need other things to key off of in this API?

Yeah it's complicated because of the number of input arguments to
aux_get. You can have the same cpv existing in multiple repos. It's so
much simpler to let the caller manage the mapping from input arguments
to future instance.
-- 
Thanks,
Zac



signature.asc
Description: OpenPGP digital signature


Re: [gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)

2018-02-25 Thread Alec Warner
On Sun, Feb 25, 2018 at 8:50 PM, Zac Medico  wrote:

> The iter_completed function is similar to asyncio.as_completed, but
> takes an iterator of futures as input, and includes support for
> max_jobs and max_load parameters. The default values for max_jobs
> and max_load correspond to multiprocessing.cpu_count().
>
> Example usage for async_aux_get:
>
>   import portage
>   from portage.util.futures.iter_completed import iter_completed
>
>   portdb = portage.portdb
>   future_cpv = {}
>

I'm not sure I grasp the purpose of this dict, can't we just modify the
async aux get to return the cpv from the future?


>
>   def future_generator():
> for cpv in portdb.cp_list('sys-apps/portage'):
>   future = portdb.async_aux_get(cpv, portage.auxdbkeys)
>   future_cpv[id(future)] = cpv
>   yield future
>
>
for cpv in portdb.cp_list('...'):
   yield portdb.async_aux_get(cpv, portage.auxdbkeys)


>   for future in iter_completed(future_generator()):
> cpv = future_cpv.pop(id(future))
> try:
>   result = future.result()
> except KeyError as e:
>   # aux_get failed
>   print('error:', cpv, e)
> else:
>   print(cpv, result)
>

for future in iter_completed(future_generator()):
  try:
cpv, result = future.result()
  except KeyError as e:
print('error', cpv, e)


Or do we expect callers to need other things to key off of in this API?

-A


> See: https://docs.python.org/3/library/asyncio-task.html#
> asyncio.as_completed
> Bug: https://bugs.gentoo.org/648790
> ---
>  .../tests/util/futures/test_iter_completed.py  | 50 
>  pym/portage/util/_async/FuturePollTask.py  | 27 ++
>  pym/portage/util/futures/iter_completed.py | 63 ++
>  pym/portage/util/futures/wait.py   | 95
> ++
>  4 files changed, 235 insertions(+)
>  create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
>  create mode 100644 pym/portage/util/_async/FuturePollTask.py
>  create mode 100644 pym/portage/util/futures/iter_completed.py
>  create mode 100644 pym/portage/util/futures/wait.py
>
> diff --git a/pym/portage/tests/util/futures/test_iter_completed.py
> b/pym/portage/tests/util/futures/test_iter_completed.py
> new file mode 100644
> index 0..6607d871c
> --- /dev/null
> +++ b/pym/portage/tests/util/futures/test_iter_completed.py
> @@ -0,0 +1,50 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import time
> +from portage.tests import TestCase
> +from portage.util._async.ForkProcess import ForkProcess
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.futures.iter_completed import iter_completed
> +
> +
> +class SleepProcess(ForkProcess):
> +   __slots__ = ('future', 'seconds')
> +   def _start(self):
> +   self.addExitListener(self._future_done)
> +   ForkProcess._start(self)
> +
> +   def _future_done(self, task):
> +   self.future.set_result(self.seconds)
> +
> +   def _run(self):
> +   time.sleep(self.seconds)
> +
> +
> +class IterCompletedTestCase(TestCase):
> +
> +   def testIterCompleted(self):
> +
> +   # Mark this as todo, since we don't want to fail if heavy
> system
> +   # load causes the tasks to finish in an unexpected order.
> +   self.todo = True
> +
> +   loop = global_event_loop()
> +   tasks = [
> +   SleepProcess(seconds=0.200),
> +   SleepProcess(seconds=0.100),
> +   SleepProcess(seconds=0.001),
> +   ]
> +
> +   expected_order = sorted(task.seconds for task in tasks)
> +
> +   def future_generator():
> +   for task in tasks:
> +   task.future = loop.create_future()
> +   task.scheduler = loop
> +   task.start()
> +   yield task.future
> +
> +   for seconds, future in zip(expected_order,
> iter_completed(future_generator(),
> +   max_jobs=None, max_load=None, loop=loop)):
> +   self.assertEqual(seconds, future.result())
> diff --git a/pym/portage/util/_async/FuturePollTask.py
> b/pym/portage/util/_async/FuturePollTask.py
> new file mode 100644
> index 0..6b7cdf7d5
> --- /dev/null
> +++ b/pym/portage/util/_async/FuturePollTask.py
> @@ -0,0 +1,27 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import os
> +import signal
> +
> +from _emerge.AbstractPollTask import AbstractPollTask
> +
> +
> +class FuturePollTask(AbstractPollTask):
> +   """
> +   Wraps a Future in an AsynchronousTask, which is useful for
> +   scheduling with TaskScheduler.
> +   """
> +

[gentoo-portage-dev] [PATCH 2/2] Add iter_completed convenience function (bug 648790)

2018-02-25 Thread Zac Medico
The iter_completed function is similar to asyncio.as_completed, but
takes an iterator of futures as input, and includes support for
max_jobs and max_load parameters. The default values for max_jobs
and max_load correspond to multiprocessing.cpu_count().

Example usage for async_aux_get:

  import portage
  from portage.util.futures.iter_completed import iter_completed

  portdb = portage.portdb
  future_cpv = {}

  def future_generator():
for cpv in portdb.cp_list('sys-apps/portage'):
  future = portdb.async_aux_get(cpv, portage.auxdbkeys)
  future_cpv[id(future)] = cpv
  yield future

  for future in iter_completed(future_generator()):
cpv = future_cpv.pop(id(future))
try:
  result = future.result()
except KeyError as e:
  # aux_get failed
  print('error:', cpv, e)
else:
  print(cpv, result)

See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
Bug: https://bugs.gentoo.org/648790
---
 .../tests/util/futures/test_iter_completed.py  | 50 
 pym/portage/util/_async/FuturePollTask.py  | 27 ++
 pym/portage/util/futures/iter_completed.py | 63 ++
 pym/portage/util/futures/wait.py   | 95 ++
 4 files changed, 235 insertions(+)
 create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
 create mode 100644 pym/portage/util/_async/FuturePollTask.py
 create mode 100644 pym/portage/util/futures/iter_completed.py
 create mode 100644 pym/portage/util/futures/wait.py

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py 
b/pym/portage/tests/util/futures/test_iter_completed.py
new file mode 100644
index 0..6607d871c
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -0,0 +1,50 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import time
+from portage.tests import TestCase
+from portage.util._async.ForkProcess import ForkProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.iter_completed import iter_completed
+
+
+class SleepProcess(ForkProcess):
+   __slots__ = ('future', 'seconds')
+   def _start(self):
+   self.addExitListener(self._future_done)
+   ForkProcess._start(self)
+
+   def _future_done(self, task):
+   self.future.set_result(self.seconds)
+
+   def _run(self):
+   time.sleep(self.seconds)
+
+
+class IterCompletedTestCase(TestCase):
+
+   def testIterCompleted(self):
+
+   # Mark this as todo, since we don't want to fail if heavy system
+   # load causes the tasks to finish in an unexpected order.
+   self.todo = True
+
+   loop = global_event_loop()
+   tasks = [
+   SleepProcess(seconds=0.200),
+   SleepProcess(seconds=0.100),
+   SleepProcess(seconds=0.001),
+   ]
+
+   expected_order = sorted(task.seconds for task in tasks)
+
+   def future_generator():
+   for task in tasks:
+   task.future = loop.create_future()
+   task.scheduler = loop
+   task.start()
+   yield task.future
+
+   for seconds, future in zip(expected_order, 
iter_completed(future_generator(),
+   max_jobs=None, max_load=None, loop=loop)):
+   self.assertEqual(seconds, future.result())
diff --git a/pym/portage/util/_async/FuturePollTask.py 
b/pym/portage/util/_async/FuturePollTask.py
new file mode 100644
index 0..6b7cdf7d5
--- /dev/null
+++ b/pym/portage/util/_async/FuturePollTask.py
@@ -0,0 +1,27 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+import signal
+
+from _emerge.AbstractPollTask import AbstractPollTask
+
+
+class FuturePollTask(AbstractPollTask):
+   """
+   Wraps a Future in an AsynchronousTask, which is useful for
+   scheduling with TaskScheduler.
+   """
+   __slots__ = ('future',)
+   def _start(self):
+   self.future.add_done_callback(self._done_callback)
+
+   def _done_callback(self, future):
+   if future.cancelled():
+   self.cancelled = True
+   self.returncode = -signal.SIGINT
+   elif future.exception() is None:
+   self.returncode = os.EX_OK
+   else:
+   self.returncode = 1
+   self.wait()
diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
new file mode 100644
index 0..0540cc986
--- /dev/null
+++ b/pym/portage/util/futures/iter_completed.py
@@ -0,0 +1,63 @@
+# Copyright 2018 Gentoo