Hello community,
here is the log from the commit of package python3-jupyter_ipyparallel for
openSUSE:Factory checked in at 2016-11-13 22:50:55
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python3-jupyter_ipyparallel (Old)
and /work/SRC/openSUSE:Factory/.python3-jupyter_ipyparallel.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python3-jupyter_ipyparallel"
Changes:
--------
---
/work/SRC/openSUSE:Factory/python3-jupyter_ipyparallel/python3-jupyter_ipyparallel-doc.changes
2016-08-05 18:16:17.000000000 +0200
+++
/work/SRC/openSUSE:Factory/.python3-jupyter_ipyparallel.new/python3-jupyter_ipyparallel-doc.changes
2016-11-13 22:50:56.000000000 +0100
@@ -1,0 +2,14 @@
+Sun Oct 2 17:14:20 UTC 2016 - [email protected]
+
+- update to version 5.2.0:
+ * Fix compatibility with changes in ipykernel 4.3, 4.4
+ * Improve inspection of "@remote" decorated functions
+ * :meth:`Client.wait` accepts any Future.
+ * Add "--user" flag to :command:`ipcluster nbextension`
+ * Default to one core per worker in
+ :meth:`Client.become_distributed`. Override by specifying
+ `ncores` keyword-argument.
+ * Subprocess logs are no longer sent to files by default in
+ :command:`ipcluster`.
+
+-------------------------------------------------------------------
python3-jupyter_ipyparallel.changes: same change
Old:
----
ipyparallel-5.1.1.tar.gz
New:
----
ipyparallel-5.2.0.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python3-jupyter_ipyparallel-doc.spec ++++++
--- /var/tmp/diff_new_pack.4a5HWE/_old 2016-11-13 22:50:58.000000000 +0100
+++ /var/tmp/diff_new_pack.4a5HWE/_new 2016-11-13 22:50:58.000000000 +0100
@@ -24,7 +24,7 @@
%endif
Name: python3-jupyter_ipyparallel-doc
-Version: 5.1.1
+Version: 5.2.0
Release: 0
Summary: Documentation for python-jupyter_ipyparallel
License: BSD-3-Clause
++++++ python3-jupyter_ipyparallel.spec ++++++
--- /var/tmp/diff_new_pack.4a5HWE/_old 2016-11-13 22:50:58.000000000 +0100
+++ /var/tmp/diff_new_pack.4a5HWE/_new 2016-11-13 22:50:58.000000000 +0100
@@ -17,7 +17,7 @@
Name: python3-jupyter_ipyparallel
-Version: 5.1.1
+Version: 5.2.0
Release: 0
Summary: Interactive Parallel Computing with IPython
License: BSD-3-Clause
++++++ ipyparallel-5.1.1.tar.gz -> ipyparallel-5.2.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/PKG-INFO
new/ipyparallel-5.2.0/PKG-INFO
--- old/ipyparallel-5.1.1/PKG-INFO 2016-06-24 16:15:33.000000000 +0200
+++ new/ipyparallel-5.2.0/PKG-INFO 2016-08-17 21:49:40.000000000 +0200
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: ipyparallel
-Version: 5.1.1
+Version: 5.2.0
Summary: Interactive Parallel Computing with IPython
Home-page: http://ipython.org
Author: IPython Development Team
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/README.md
new/ipyparallel-5.2.0/README.md
--- old/ipyparallel-5.1.1/README.md 2016-06-22 02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/README.md 2016-08-14 19:49:09.000000000 +0200
@@ -1,6 +1,12 @@
# Interactive Parallel Computing with IPython
-ipyparallel is the new home of IPython.parallel.
+ipyparallel is the new home of IPython.parallel. ipyparallel is a Python
package and collection of CLI scripts for controlling clusters for Jupyter.
+
+ipyparallel contains the following CLI scripts:
+
+* ipcluster - start/stop a cluster
+* ipcontroller - start a scheduler
+* ipengine - start an engine
## Install
@@ -17,10 +23,17 @@
ipcluster nbextension disable
-
See the [documentation on configuring the notebook
server](https://jupyter-notebook.readthedocs.org/en/latest/public_server.html)
to find your config or setup your initial `jupyter_notebook_config.py`.
+### JupyterHub Install
+
+To install for all users on JupyterHub, as root:
+
+ jupyter nbextension install --sys-prefix --py ipyparallel
+ jupyter nbextension enable --sys-prefix --py ipyparallel
+ jupyter serverextension enable --sys-prefix --py ipyparallel
+
## Run
Start a cluster:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/docs/source/changelog.rst
new/ipyparallel-5.2.0/docs/source/changelog.rst
--- old/ipyparallel-5.1.1/docs/source/changelog.rst 2016-06-24
16:15:05.000000000 +0200
+++ new/ipyparallel-5.2.0/docs/source/changelog.rst 2016-08-17
21:36:01.000000000 +0200
@@ -4,6 +4,18 @@
===========================
+5.2
+---
+
+- Fix compatibility with changes in ipykernel 4.3, 4.4
+- Improve inspection of ``@remote`` decorated functions
+- :meth:`Client.wait` accepts any Future.
+- Add ``--user`` flag to :command:`ipcluster nbextension`
+- Default to one core per worker in :meth:`Client.become_distributed`.
+ Override by specifying `ncores` keyword-argument.
+- Subprocess logs are no longer sent to files by default in
:command:`ipcluster`.
+
+
5.1
---
@@ -47,13 +59,6 @@
`Slurm <https://computing.llnl.gov/tutorials/linux_clusters>`_ support is
added to ipcluster.
-5.1.1
-~~~~~
-
-5.1.1 fixes iteration through AsyncResults, which was broken in some instances
by 5.0.
-
-`5.1.1 on GitHub <https://github.com/ipython/ipyparallel/milestones/5.1.1>`__
-
5.1.0
~~~~~
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/docs/source/demos.rst
new/ipyparallel-5.2.0/docs/source/demos.rst
--- old/ipyparallel-5.1.1/docs/source/demos.rst 2016-06-22 02:20:14.000000000
+0200
+++ new/ipyparallel-5.2.0/docs/source/demos.rst 2016-08-14 19:49:09.000000000
+0200
@@ -100,11 +100,12 @@
The overall idea of the calculation is simple: each IPython engine will
compute the two digit counts for the digits in a single file. Then in a final
step the counts from each engine will be added up. To perform this
-calculation, we will need two top-level functions from :file:`pidigits.py`:
+calculation, we will need two top-level functions from :file:`pidigits.py`,
+:func:`compute_two_digit_freqs` and :func:`reduce_freqs`:
.. literalinclude:: ../../examples/pi/pidigits.py
:language: python
- :lines: 47-62
+ :lines: 52-67
We will also use the :func:`plot_two_digit_freqs` function to plot the
results. The code to run this calculation in parallel is contained in
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/_version.py
new/ipyparallel-5.2.0/ipyparallel/_version.py
--- old/ipyparallel-5.1.1/ipyparallel/_version.py 2016-06-24
16:15:05.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/_version.py 2016-08-17
21:36:34.000000000 +0200
@@ -1,2 +1,2 @@
-version_info = (5, 1, 1)
+version_info = (5, 2, 0)
__version__ = '.'.join(map(str, version_info))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/ipclusterapp.py
new/ipyparallel-5.2.0/ipyparallel/apps/ipclusterapp.py
--- old/ipyparallel-5.1.1/ipyparallel/apps/ipclusterapp.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/apps/ipclusterapp.py 2016-08-08
14:08:17.000000000 +0200
@@ -290,7 +290,7 @@
early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
_stopping = False
-
+
aliases = Dict(engine_aliases)
flags = Dict(engine_flags)
@@ -325,7 +325,7 @@
if self.engine_launcher.running:
self.log.info("Engines appear to have started successfully")
self.early_shutdown = 0
-
+
def start_engines(self):
# Some EngineSetLaunchers ignore `n` and use their own engine count,
such as SSH:
n = getattr(self.engine_launcher, 'engine_count', self.n)
@@ -343,21 +343,21 @@
if self.early_shutdown and not self._stopping:
self.log.error("""
Engines shutdown early, they probably failed to connect.
-
+
Check the engine log files for output.
-
+
If your controller and engines are not on the same machine, you
probably
have to instruct the controller to listen on an interface other
than localhost.
-
+
You can set this by adding "--ip='*'" to your
ControllerLauncher.controller_args.
-
+
Be sure to read our security docs before instructing your
controller to listen on
a public interface.
""")
self.stop_launchers()
-
+
return self.engines_stopped(r)
-
+
def engines_stopped(self, r):
return self.loop.stop()
@@ -402,7 +402,7 @@
if self.daemonize:
if os.name=='posix':
daemonize()
-
+
self.loop.add_callback(self.start_engines)
# Now write the new pid file AFTER our new forked pid is active.
# self.write_pid_file()
@@ -443,7 +443,7 @@
delay = CFloat(1., config=True,
help="delay (in s) between starting the controller and the engines")
-
+
controller_ip = Unicode(config=True, help="Set the IP address of the
controller.")
controller_launcher = Any(config=True, help="Deprecated, use
controller_launcher_class")
def _controller_launcher_changed(self, name, old, new):
@@ -459,7 +459,7 @@
Each launcher class has its own set of configuration options, for
making sure
it will work in your environment.
-
+
Note that using a batch launcher for the controller *does not* put it
in the same batch job as the engines, so they will still start
separately.
@@ -498,11 +498,11 @@
if self.controller_ip:
self.controller_launcher.controller_args.append('--ip=%s' %
self.controller_ip)
self.engine_launcher = self.build_launcher(self.engine_launcher_class,
'EngineSet')
-
+
def engines_stopped(self, r):
"""prevent parent.engines_stopped from stopping everything on engine
shutdown"""
pass
-
+
def start_controller(self):
self.log.info("Starting Controller with %s",
self.controller_launcher_class)
self.controller_launcher.on_stop(self.stop_launchers)
@@ -550,7 +550,7 @@
if self.daemonize:
if os.name=='posix':
daemonize()
-
+
def start():
self.start_controller()
self.loop.add_timeout(self.loop.time() + self.delay,
self.start_engines)
@@ -572,24 +572,28 @@
class IPClusterNBExtension(BaseIPythonApplication):
"""Enable/disable ipcluster tab extension in Jupyter notebook"""
-
+
name = 'ipcluster-nbextension'
-
+
description = """Enable/disable IPython clusters tab in Jupyter notebook
-
+
for Jupyter Notebook >= 4.2, you can use the new nbextension API:
-
+
jupyter serverextension enable --py ipyparallel
jupyter nbextension install --py ipyparallel
jupyter nbextension enable --py ipyparallel
"""
-
+
examples = """
ipcluster nbextension enable
ipcluster nbextension disable
"""
version = __version__
-
+ user = Bool(False, help="Apply the operation only for the given user").tag(
+ config=True)
+ flags = Dict({'user': ({'IPClusterNBExtension': {'user': True}},
+ 'Apply the operation only for the given user')})
+
def start(self):
from ipyparallel.nbextension.install import install_extensions
if len(self.extra_args) != 1:
@@ -597,10 +601,10 @@
action = self.extra_args[0].lower()
if action == 'enable':
print("Enabling IPython clusters tab")
- install_extensions(enable=True)
+ install_extensions(enable=True, user=self.user)
elif action == 'disable':
print("Disabling IPython clusters tab")
- install_extensions(enable=False)
+ install_extensions(enable=False, user=self.user)
else:
self.exit("Must specify 'enable' or 'disable', not '%s'" % action)
@@ -626,7 +630,9 @@
def start(self):
if self.subapp is None:
- print("No subcommand specified. Must specify one of:
%s"%(self.subcommands.keys()))
+ keys = ', '.join("'{}'".format(key) for
+ key in self.subcommands.keys())
+ print("No subcommand specified. Must specify one of: %s" % keys)
print()
self.print_description()
self.print_subcommands()
@@ -638,4 +644,3 @@
if __name__ == '__main__':
launch_new_instance()
-
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/ipengineapp.py
new/ipyparallel-5.2.0/ipyparallel/apps/ipengineapp.py
--- old/ipyparallel-5.1.1/ipyparallel/apps/ipengineapp.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/apps/ipengineapp.py 2016-08-08
14:08:17.000000000 +0200
@@ -254,7 +254,12 @@
app.shell_port = app._bind_socket(kernel.shell_streams[0],
app.shell_port)
app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
- app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
+ iopub_socket = kernel.iopub_socket
+ # ipykernel 4.3 iopub_socket is an IOThread wrapper:
+ if hasattr(iopub_socket, 'socket'):
+ iopub_socket = iopub_socket.socket
+
+ app.iopub_port = app._bind_socket(iopub_socket, app.iopub_port)
app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/launcher.py
new/ipyparallel-5.2.0/ipyparallel/apps/launcher.py
--- old/ipyparallel-5.1.1/ipyparallel/apps/launcher.py 2016-06-22
02:28:45.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/apps/launcher.py 2016-07-06
13:00:57.000000000 +0200
@@ -214,14 +214,14 @@
controller_cmd = List(ipcontroller_cmd_argv, config=True,
help="""Popen command to launch ipcontroller.""")
# Command line arguments to ipcontroller.
- controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO],
config=True,
+ controller_args = List(['--log-level=%i' % logging.INFO], config=True,
help="""command-line args to pass to ipcontroller""")
class EngineMixin(ClusterAppMixin):
engine_cmd = List(ipengine_cmd_argv, config=True,
help="""command to launch the Engine.""")
# Command line arguments for ipengine.
- engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO],
config=True,
+ engine_args = List(['--log-level=%i' % logging.INFO], config=True,
help="command-line arguments to pass to ipengine"
)
@@ -1195,7 +1195,7 @@
default_template= Unicode("""#!/bin/sh
#PBS -V
#PBS -N ipcontroller
-%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
@@ -1300,7 +1300,7 @@
default_template= Unicode("""#!/bin/sh
#SBATCH --job-name=ipy-controller-{cluster_id}
#SBATCH --ntasks=1
-%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
@@ -1335,7 +1335,7 @@
default_template= Unicode(u"""#$ -V
#$ -S /bin/sh
#$ -N ipcontroller
-%s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
+%s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
def start(self):
@@ -1400,7 +1400,7 @@
#BSUB -J ipcontroller
#BSUB -oo ipcontroller.o.%%J
#BSUB -eo ipcontroller.e.%%J
- %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
+ %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
"""%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
def start(self):
@@ -1481,7 +1481,7 @@
executable = ipcontroller
# by default we expect a shared file system
transfer_executable = False
-arguments = --log-to-file '--profile-dir={profile_dir}'
--cluster-id='{cluster_id}'
+arguments = '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
""")
def start(self):
@@ -1498,7 +1498,7 @@
executable = ipengine
# by default we expect a shared file system
transfer_executable = False
-arguments = "--log-to-file '--profile-dir={profile_dir}'
'--cluster-id={cluster_id}'"
+arguments = " '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
""")
@@ -1513,7 +1513,7 @@
ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
help="Popen command for ipcluster")
ipcluster_args = List(
- ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO],
config=True,
+ ['--clean-logs=True', '--log-level=%i' % logging.INFO], config=True,
help="Command line arguments to pass to ipcluster.")
ipcluster_subcommand = Unicode('start')
profile = Unicode('default')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/apps/winhpcjob.py
new/ipyparallel-5.2.0/ipyparallel/apps/winhpcjob.py
--- old/ipyparallel-5.1.1/ipyparallel/apps/winhpcjob.py 2016-06-22
02:28:45.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/apps/winhpcjob.py 2016-07-06
13:00:57.000000000 +0200
@@ -252,7 +252,7 @@
task_name = Unicode('IPController', config=True)
controller_cmd = List(['ipcontroller.exe'], config=True)
- controller_args = List(['--log-to-file', '--log-level=40'], config=True)
+ controller_args = List(['--log-level=40'], config=True)
# I don't want these to be configurable
std_out_file_path = Unicode('', config=False)
std_err_file_path = Unicode('', config=False)
@@ -280,7 +280,7 @@
task_name = Unicode('IPEngine', config=True)
engine_cmd = List(['ipengine.exe'], config=True)
- engine_args = List(['--log-to-file', '--log-level=40'], config=True)
+ engine_args = List(['--log-level=40'], config=True)
# I don't want these to be configurable
std_out_file_path = Unicode('', config=False)
std_err_file_path = Unicode('', config=False)
@@ -304,17 +304,3 @@
return ' '.join(self.engine_cmd + self.engine_args)
-# j = WinHPCJob(None)
-# j.job_name = 'IPCluster'
-# j.username = 'GNET\\bgranger'
-# j.requested_nodes = 'GREEN'
-#
-# t = WinHPCTask(None)
-# t.task_name = 'Controller'
-# t.command_line =
r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe
--log-to-file -p default --log-level 10"
-# t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
-# t.std_out_file_path = 'controller-out.txt'
-# t.std_err_file_path = 'controller-err.txt'
-# t.environment_variables['PYTHONPATH'] =
r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
-# j.add_task(t)
-
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/asyncresult.py
new/ipyparallel-5.2.0/ipyparallel/client/asyncresult.py
--- old/ipyparallel-5.1.1/ipyparallel/client/asyncresult.py 2016-06-24
14:48:07.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/client/asyncresult.py 2016-08-08
14:55:03.000000000 +0200
@@ -386,9 +386,9 @@
evt = Event()
for child in self._children:
self._wait_for_child(child, evt=evt)
- results = child.result()
- error.collect_exceptions([results], self._fname)
- yield results
+ result = child.result()
+ error.collect_exceptions([result], self._fname)
+ yield result
else:
# already done
for r in rlist:
@@ -678,15 +678,11 @@
for use in iterator methods
"""
rlist = child.result()
+ if not isinstance(rlist, list):
+ rlist = [rlist]
error.collect_exceptions(rlist, self._fname)
- try:
- for r in rlist:
- yield r
- except TypeError:
- # flattened, not a list
- # this could get broken by flattened data that returns iterables
- # but most calls to map do not expose the `flatten` argument
- yield rlist
+ for r in rlist:
+ yield r
# asynchronous ordered iterator:
def _ordered_iter(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/client.py
new/ipyparallel-5.2.0/ipyparallel/client/client.py
--- old/ipyparallel-5.1.1/ipyparallel/client/client.py 2016-06-24
16:15:02.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/client/client.py 2016-08-14
19:49:09.000000000 +0200
@@ -5,6 +5,8 @@
from __future__ import print_function
+import collections
+from concurrent.futures import Future
import os
import json
from threading import Thread, Event, current_thread
@@ -20,7 +22,6 @@
import zmq
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream
-from tornado.concurrent import Future
from tornado.gen import multi_future
from traitlets.config.configurable import MultipleInstanceError
@@ -221,6 +222,11 @@
raise KeyError(key)
+def _is_future(f):
+ """light duck-typing check for Futures"""
+ return hasattr(f, 'add_done_callback')
+
+
class Client(HasTraits):
"""A semi-synchronous client to an IPython parallel cluster
@@ -1112,10 +1118,15 @@
True : when all msg_ids are done
False : timeout reached, some msg_ids still outstanding
"""
+ futures = []
if jobs is None:
- theids = self.outstanding
+ if not self.outstanding:
+ return True
+ # make a copy, so that we aren't passing a mutable collection to
_futures_for_msgs
+ theids = set(self.outstanding)
else:
- if isinstance(jobs, string_types + (int, AsyncResult)):
+ if isinstance(jobs, string_types + (int, AsyncResult)) \
+ or not isinstance(jobs, collections.Iterable):
jobs = [jobs]
theids = set()
for job in jobs:
@@ -1125,11 +1136,14 @@
elif isinstance(job, AsyncResult):
theids.update(job.msg_ids)
continue
+ elif _is_future(job):
+ futures.append(job)
+ continue
theids.add(job)
- if not theids.intersection(self.outstanding):
- return True
+ if not futures and not theids.intersection(self.outstanding):
+ return True
- futures = self._futures_for_msgs(theids)
+ futures.extend(self._futures_for_msgs(theids))
return self._await_futures(futures, timeout)
def wait_interactive(self, jobs=None, interval=1., timeout=-1.):
@@ -1307,9 +1321,11 @@
worker_args['ip'] = distributed_info['ip']
worker_args['port'] = distributed_info['port']
worker_args['nanny'] = nanny
+ # set default ncores=1, since that's how an IPython cluster is
typically set up.
+ worker_args.setdefault('ncores', 1)
+ dview.apply_sync(util.become_distributed_worker, **worker_args)
# Finally, return an Executor connected to the Scheduler
- dview.apply_sync(util.become_distributed_worker, **worker_args)
executor =
distributed.Executor('{ip}:{port}'.format(**distributed_info))
return executor
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/client/futures.py
new/ipyparallel-5.2.0/ipyparallel/client/futures.py
--- old/ipyparallel-5.1.1/ipyparallel/client/futures.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/client/futures.py 2016-08-14
19:49:09.000000000 +0200
@@ -4,7 +4,7 @@
# Distributed under the terms of the Modified BSD License.
from threading import Event
-from tornado.concurrent import Future
+from concurrent.futures import Future
class MessageFuture(Future):
"""Future class to wrap async messages"""
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/ipyparallel-5.1.1/ipyparallel/client/remotefunction.py
new/ipyparallel-5.2.0/ipyparallel/client/remotefunction.py
--- old/ipyparallel-5.1.1/ipyparallel/client/remotefunction.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/client/remotefunction.py 2016-08-14
19:49:09.000000000 +0200
@@ -12,6 +12,7 @@
from . import map as Map
from .asyncresult import AsyncMapResult
+from IPython.utils.signatures import signature
#-----------------------------------------------------------------------------
# Functions and Decorators
@@ -113,6 +114,22 @@
self.block=block
self.flags=flags
+ # copy function attributes for nicer inspection
+ # of decorated functions
+ self.__name__ = getname(f)
+ if getattr(f, '__doc__', None):
+ self.__doc__ = '{} wrapping:\n{}'.format(
+ self.__class__.__name__, f.__doc__,
+ )
+ if getattr(f, '__signature__', None):
+ self.__signature__ = f.__signature__
+ else:
+ try:
+ self.__signature__ = signature(f)
+ except Exception:
+ # no signature, but that's okay
+ pass
+
def __call__(self, *args, **kwargs):
block = self.view.block if self.block is None else self.block
with self.view.temp_flags(block=block, **self.flags):
@@ -155,7 +172,6 @@
chunksize = None
ordered = None
mapObject = None
- _mapping = False
def __init__(self, view, f, dist='b', block=None, chunksize=None,
ordered=True, **flags):
super(ParallelFunction, self).__init__(view, f, block=block, **flags)
@@ -166,8 +182,11 @@
self.mapObject = mapClass()
@sync_view_results
- def __call__(self, *sequences):
+ def __call__(self, *sequences, **kwargs):
client = self.view.client
+ _mapping = kwargs.pop('__ipp_mapping', False)
+ if kwargs:
+ raise TypeError("Unexpected keyword arguments: %s" % kwargs)
lens = []
maxlen = minlen = -1
@@ -192,7 +211,7 @@
return []
# check that the length of sequences match
- if not self._mapping and minlen != maxlen:
+ if not _mapping and minlen != maxlen:
msg = 'all sequences must have equal length, but have %s' % lens
raise ValueError(msg)
@@ -226,7 +245,7 @@
if sum([len(arg) for arg in args]) == 0:
continue
- if self._mapping:
+ if _mapping:
if sys.version_info[0] >= 3:
f = lambda f, *sequences: list(map(f, *sequences))
else:
@@ -263,12 +282,6 @@
That means it can take generators (will be cast to lists locally),
and mismatched sequence lengths will be padded with None.
"""
- # set _mapping as a flag for use inside self.__call__
- self._mapping = True
- try:
- ret = self(*sequences)
- finally:
- self._mapping = False
- return ret
+ return self(*sequences, __ipp_mapping=True)
__all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/controller/hub.py
new/ipyparallel-5.2.0/ipyparallel/controller/hub.py
--- old/ipyparallel-5.1.1/ipyparallel/controller/hub.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/controller/hub.py 2016-08-08
14:55:03.000000000 +0200
@@ -426,7 +426,7 @@
b'tracktask': self.save_task_destination,
b'incontrol': _passer,
b'outcontrol': _passer,
- b'iopub': self.save_iopub_message,
+ b'iopub': self.monitor_iopub_message,
}
self.query_handlers = {'queue_request': self.queue_status,
@@ -833,15 +833,27 @@
#--------------------- IOPub Traffic ------------------------------
- def save_iopub_message(self, topics, msg):
- """save an iopub message into the db"""
- # print (topics)
+ def monitor_iopub_message(self, topics, msg):
+ '''intercept iopub traffic so events can be acted upon'''
try:
msg = self.session.deserialize(msg, content=True)
except Exception:
self.log.error("iopub::invalid IOPub message", exc_info=True)
return
+ msg_type = msg['header']['msg_type']
+ if msg_type == 'shutdown_reply':
+ session = msg['header']['session']
+ eid = self.by_ident.get(session, None)
+ uuid = self.engines[eid].uuid
+ self.unregister_engine(ident='shutdown_reply',
+ msg=dict(content=dict(id=eid, queue=uuid)))
+
+ if msg_type not in ('status', 'shutdown_reply', ):
+ self.save_iopub_message(topics, msg)
+
+ def save_iopub_message(self, topics, msg):
+ """save an iopub message into the db"""
parent = msg['parent_header']
if not parent:
self.log.debug("iopub::IOPub message lacks parent: %r", msg)
@@ -862,15 +874,12 @@
name = content['name']
s = '' if rec is None else rec[name]
d[name] = s + content['text']
-
elif msg_type == 'error':
d['error'] = content
elif msg_type == 'execute_input':
d['execute_input'] = content['code']
elif msg_type in ('display_data', 'execute_result'):
d[msg_type] = content
- elif msg_type == 'status':
- pass
elif msg_type == 'data_pub':
self.log.info("ignored data_pub message for %s" % msg_id)
else:
@@ -985,6 +994,11 @@
content=dict(id=eid, uuid=uuid)
self.dead_engines.add(uuid)
+ #stop the heartbeats
+ self.hearts.pop(uuid, None)
+ self.heartmonitor.responses.discard(uuid)
+ self.heartmonitor.hearts.discard(uuid)
+
self.loop.add_timeout(
self.loop.time() + self.registration_timeout,
lambda : self._handle_stranded_msgs(eid, uuid),
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/engine/kernel.py
new/ipyparallel-5.2.0/ipyparallel/engine/kernel.py
--- old/ipyparallel-5.1.1/ipyparallel/engine/kernel.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/engine/kernel.py 2016-08-10
16:10:22.000000000 +0200
@@ -3,7 +3,7 @@
import sys
from datetime import datetime
-from ipython_genutils.py3compat import cast_bytes, cast_unicode_py2
+from ipython_genutils.py3compat import cast_bytes, cast_unicode_py2,
unicode_type, safe_unicode
from traitlets import Integer, Type
from ipykernel.ipkernel import IPythonKernel
@@ -24,8 +24,7 @@
base = "engine.%s" % self.engine_id
return cast_bytes("%s.%s" % (base, topic))
-
-
+
def __init__(self, **kwargs):
super(IPythonParallelKernel, self).__init__(**kwargs)
# add apply_request, in anticipation of upstream deprecation
@@ -35,7 +34,7 @@
self.shell.configurables.append(data_pub)
data_pub.session = self.session
data_pub.pub_socket = self.iopub_socket
-
+
def init_metadata(self, parent):
"""init metadata dict, for execute/apply_reply"""
return {
@@ -165,21 +164,33 @@
item_threshold=self.session.item_threshold,
)
- except:
+ except BaseException as e:
# invoke IPython traceback formatting
shell.showtraceback()
- # FIXME - fish exception info out of shell, possibly left there by
- # run_code. We'll need to clean up this logic later.
- reply_content = {}
- if shell._reply_content is not None:
- reply_content.update(shell._reply_content)
- # reset after use
- shell._reply_content = None
+ reply_content = {
+ 'traceback': [],
+ 'ename': unicode_type(type(e).__name__),
+ 'evalue': safe_unicode(e),
+ }
+ # get formatted traceback, which ipykernel recorded
+ if hasattr(shell, '_last_traceback'):
+ # ipykernel 4.4
+ reply_content['traceback'] = shell._last_traceback or []
+ elif hasattr(shell, '_reply_content'):
+ # ipykernel <= 4.3
+ if shell._reply_content and 'traceback' in
shell._reply_content:
+ reply_content['traceback'] =
shell._reply_content['traceback']
+ else:
+ self.log.warning("Didn't find a traceback where I expected to")
+ shell._last_traceback = None
+ e_info = dict(engine_uuid=self.ident, engine_id=self.int_id,
method='apply')
+ reply_content['engine_info'] = e_info
self.send_response(self.iopub_socket, u'error', reply_content,
ident=self._topic('error'))
self.log.info("Exception in apply request:\n%s",
'\n'.join(reply_content['traceback']))
result_buf = []
+ reply_content['status'] = 'error'
else:
reply_content = {'status' : 'ok'}
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/nbextension/install.py
new/ipyparallel-5.2.0/ipyparallel/nbextension/install.py
--- old/ipyparallel-5.1.1/ipyparallel/nbextension/install.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/nbextension/install.py 2016-08-08
14:08:17.000000000 +0200
@@ -8,23 +8,23 @@
from notebook.services.config import ConfigManager as FrontendConfigManager
-def install_extensions(enable=True):
+def install_extensions(enable=True, user=False):
"""Register ipyparallel clusters tab as notebook extensions
-
+
Toggle with enable=True/False.
"""
from distutils.version import LooseVersion as V
import notebook
-
+
if V(notebook.__version__) < V('4.2'):
return _install_extension_nb41(enable)
-
+
from notebook.nbextensions import install_nbextension_python,
enable_nbextension, disable_nbextension
from notebook.serverextensions import toggle_serverextension_python
- toggle_serverextension_python('ipyparallel.nbextension')
- install_nbextension_python('ipyparallel')
+ toggle_serverextension_python('ipyparallel.nbextension', user=user)
+ install_nbextension_python('ipyparallel', user=user)
if enable:
- enable_nbextension('tree', 'ipyparallel/main')
+ enable_nbextension('tree', 'ipyparallel/main', user=user)
else:
disable_nbextension('tree', 'ipyparallel/main')
@@ -49,7 +49,7 @@
'server_extensions': server_extensions,
}
})
-
+
# frontend config (*way* easier because it's a dict)
frontend = FrontendConfigManager()
frontend.update('tree', {
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/ipyparallel-5.1.1/ipyparallel/tests/test_asyncresult.py
new/ipyparallel-5.2.0/ipyparallel/tests/test_asyncresult.py
--- old/ipyparallel-5.1.1/ipyparallel/tests/test_asyncresult.py 2016-06-24
14:48:07.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/tests/test_asyncresult.py 2016-08-10
16:09:22.000000000 +0200
@@ -91,6 +91,16 @@
for r in ar:
self.assertEqual(r, 0.125)
+ def test_iter_error(self):
+ amr = self.client[:].map_async(lambda x: 1/(x-2), range(5))
+ # iterating through failing AMR should raise RemoteError
+ self.assertRaisesRemote(ZeroDivisionError, list, amr)
+ # so should get
+ self.assertRaisesRemote(ZeroDivisionError, amr.get)
+ amr.wait(10)
+ # test iteration again after everything is local
+ self.assertRaisesRemote(ZeroDivisionError, list, amr)
+
def test_getattr(self):
ar = self.client[:].apply_async(wait, 0.5)
self.assertEqual(ar.engine_id, [None] * len(ar))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/ipyparallel/tests/test_client.py
new/ipyparallel-5.2.0/ipyparallel/tests/test_client.py
--- old/ipyparallel-5.1.1/ipyparallel/tests/test_client.py 2016-06-22
02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/ipyparallel/tests/test_client.py 2016-08-14
19:49:09.000000000 +0200
@@ -5,10 +5,14 @@
from __future__ import division
+from concurrent.futures import Future
from datetime import datetime
import os
+from threading import Thread
import time
+from tornado.concurrent import Future as TornadoFuture
+
from IPython import get_ipython
from ipyparallel.client import client as clientmod
from ipyparallel import error, AsyncHubResult, DirectView, Reference
@@ -525,7 +529,21 @@
ar = self.client[-1].apply_async(lambda : 1)
self.client.wait_interactive()
self.assertEqual(self.client.outstanding, set())
-
+
+ def test_await_future(self):
+ f = Future()
+ tf = TornadoFuture()
+ def finish_later():
+ time.sleep(0.1)
+ f.set_result('future')
+ tf.set_result('tornado')
+ Thread(target=finish_later).start()
+ assert self.client.wait([f, tf])
+ assert f.done()
+ assert tf.done()
+ assert f.result() == 'future'
+ assert tf.result() == 'tornado'
+
@skip_without('distributed')
def test_become_distributed(self):
executor = self.client.become_distributed()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/ipyparallel-5.1.1/ipyparallel/tests/test_remotefunction.py
new/ipyparallel-5.2.0/ipyparallel/tests/test_remotefunction.py
--- old/ipyparallel-5.1.1/ipyparallel/tests/test_remotefunction.py
1970-01-01 01:00:00.000000000 +0100
+++ new/ipyparallel-5.2.0/ipyparallel/tests/test_remotefunction.py
2016-08-14 19:49:09.000000000 +0200
@@ -0,0 +1,60 @@
+"""Tests for remote functions"""
+
+# Copyright (c) IPython Development Team.
+# Distributed under the terms of the Modified BSD License.
+
+from __future__ import division
+
+
+import ipyparallel as ipp
+
+from .clienttest import ClusterTestCase, add_engines
+
+def setup():
+ add_engines(2, total=True)
+
+class TestRemoteFunctions(ClusterTestCase):
+
+ def test_remote(self):
+ v = self.client[-1]
+ @ipp.remote(v, block=True)
+ def foo(x, y=5):
+ """multiply x * y"""
+ return x * y
+ self.assertEqual(foo.__name__, 'foo')
+ self.assertIn('RemoteFunction', foo.__doc__)
+ self.assertIn('multiply x', foo.__doc__)
+
+ z = foo(5)
+ self.assertEqual(z, 25)
+ z = foo(2, 3)
+ self.assertEqual(z, 6)
+ z = foo(x=5, y=2)
+ self.assertEqual(z, 10)
+
+ def test_parallel(self):
+ n = 2
+ v = self.client[:n]
+ @ipp.parallel(v, block=True)
+ def foo(x):
+ """multiply x * y"""
+ return x * 2
+ self.assertEqual(foo.__name__, 'foo')
+ self.assertIn('ParallelFunction', foo.__doc__)
+ self.assertIn('multiply x', foo.__doc__)
+
+ z = foo([1, 2, 3, 4])
+ self.assertEqual(z, [1, 2, 1, 2, 3, 4, 3, 4])
+
+ def test_parallel_map(self):
+ v = self.client.load_balanced_view()
+ @ipp.parallel(v, block=True)
+ def foo(x, y=5):
+ """multiply x * y"""
+ return x * y
+ z = foo.map([1, 2, 3])
+ self.assertEqual(z, [5, 10, 15])
+ z = foo.map([1, 2, 3], [1, 2, 3])
+ self.assertEqual(z, [1, 4, 9])
+
+
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/ipyparallel-5.1.1/setup.py
new/ipyparallel-5.2.0/setup.py
--- old/ipyparallel-5.1.1/setup.py 2016-06-22 02:20:14.000000000 +0200
+++ new/ipyparallel-5.2.0/setup.py 2016-08-14 19:49:09.000000000 +0200
@@ -123,13 +123,14 @@
extras_require = setuptools_args['extras_require'] = {
':python_version == "2.7"': ['futures'],
'nbext': ["notebook"],
+ 'test': [
+ 'nose',
+ 'ipython[test]',
+ 'testpath',
+ 'mock',
+ ],
}
-tests_require = setuptools_args['tests_require'] = [
- 'nose',
- 'ipython[test]',
- 'mock',
-]
if 'setuptools' in sys.modules:
setup_args.update(setuptools_args)