Hello community, here is the log from the commit of package python-eliot for openSUSE:Factory checked in at 2020-03-04 09:39:31 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-eliot (Old) and /work/SRC/openSUSE:Factory/.python-eliot.new.26092 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-eliot" Wed Mar 4 09:39:31 2020 rev:6 rq:781108 version:1.12.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-eliot/python-eliot.changes 2020-01-13 22:23:21.982569492 +0100 +++ /work/SRC/openSUSE:Factory/.python-eliot.new.26092/python-eliot.changes 2020-03-04 09:39:39.709901041 +0100 @@ -1,0 +2,8 @@ +Mon Mar 2 15:47:15 UTC 2020 - Marketa Calabkova <[email protected]> + +- Update to 1.12.0 + * Dask support now includes support for tracing logging of dask.persist(), + via wrapper API eliot.dask.persist_with_trace(). + * Dask edge cases that previously weren't handled correctly should work better. + +------------------------------------------------------------------- Old: ---- python-eliot-1.11.0.tar.gz New: ---- python-eliot-1.12.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-eliot.spec ++++++ --- /var/tmp/diff_new_pack.ZHdZ47/_old 2020-03-04 09:39:42.045902441 +0100 +++ /var/tmp/diff_new_pack.ZHdZ47/_new 2020-03-04 09:39:42.045902441 +0100 @@ -19,7 +19,7 @@ %{?!python_module:%define python_module() python-%{**} python3-%{**}} %define skip_python2 1 Name: python-eliot -Version: 1.11.0 +Version: 1.12.0 Release: 0 Summary: A logging system that tells the user why something happened License: Apache-2.0 ++++++ python-eliot-1.11.0.tar.gz -> python-eliot-1.12.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/docs/source/news.rst new/eliot-1.12.0/docs/source/news.rst --- old/eliot-1.11.0/docs/source/news.rst 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/docs/source/news.rst 2020-01-23 21:52:58.000000000 +0100 @@ -1,6 +1,17 @@ What's New ========== +1.12.0 +^^^^^^ + +Features: + +* Dask support now includes support for tracing logging of ``dask.persist()``, via wrapper API ``eliot.dask.persist_with_trace()``. + +Bug fixes: + +* Dask edge cases that previously weren't handled correctly should work better. + 1.11.0 ^^^^^^ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/docs/source/reading/reading.rst new/eliot-1.12.0/docs/source/reading/reading.rst --- old/eliot-1.11.0/docs/source/reading/reading.rst 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/docs/source/reading/reading.rst 2020-01-23 21:52:58.000000000 +0100 @@ -18,7 +18,7 @@ Run ``eliot-prettyprint --help`` to see the various formatting options; you can for example use a more compact one-message-per-line format. -Additionally, the **highly recommended third-party `eliot-tree`_ tool** renders JSON-formatted Eliot messages into a tree visualizing the tasks' actions. +Additionally, the **highly recommended** third-party `eliot-tree`_ tool renders JSON-formatted Eliot messages into a tree visualizing the tasks' actions. Filtering logs diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/docs/source/scientific-computing.rst new/eliot-1.12.0/docs/source/scientific-computing.rst --- old/eliot-1.11.0/docs/source/scientific-computing.rst 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/docs/source/scientific-computing.rst 2020-01-23 21:52:58.000000000 +0100 @@ -44,8 +44,9 @@ * Ensure all worker processes write the Eliot logs to disk (if you're using the ``multiprocessing`` or ``distributed`` backends). * If you're using multiple worker machines, aggregate all log files into a single place, so you can more easily analyze them with e.g. `eliot-tree <https://github.com/jonathanj/eliottree>`_. * Replace ``dask.compute()`` with ``eliot.dask.compute_with_trace()``. +* Replace ``dask.persist()`` with ``eliot.dask.persist_with_trace()``. -In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler. +In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler for more sophisticated use cases. We'll be using multiple worker processes, but only use a single machine: .. literalinclude:: ../../examples/dask_eliot.py diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/eliot/_validation.py new/eliot-1.12.0/eliot/_validation.py --- old/eliot-1.11.0/eliot/_validation.py 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/eliot/_validation.py 2020-01-23 21:52:58.000000000 +0100 @@ -388,7 +388,7 @@ this action's start message. @ivar successFields: A C{list} of L{Field} instances which can appear in - this action's succesful finish message. + this action's successful finish message. @ivar failureFields: A C{list} of L{Field} instances which can appear in this action's failed finish message (in addition to the built-in diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/eliot/_version.py new/eliot-1.12.0/eliot/_version.py --- old/eliot-1.11.0/eliot/_version.py 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/eliot/_version.py 2020-01-23 21:52:58.000000000 +0100 @@ -22,9 +22,9 @@ # setup.py/versioneer.py will grep for the variable names, so they must # each be defined on a line of their own. _version.py will just call # get_keywords(). - git_refnames = " (tag: 1.11.0)" - git_full = "4ca0fa7519321aceec860e982123a5c448a9debd" - git_date = "2019-12-07 14:22:41 -0500" + git_refnames = " (HEAD -> master, tag: 1.12.0)" + git_full = "4470cc147cd29e59f12730ec3c811a89e61edc65" + git_date = "2020-01-23 15:52:58 -0500" keywords = {"refnames": git_refnames, "full": git_full, "date": git_date} return keywords diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/eliot/dask.py new/eliot-1.12.0/eliot/dask.py --- old/eliot-1.11.0/eliot/dask.py 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/eliot/dask.py 2020-01-23 21:52:58.000000000 +0100 @@ -2,8 +2,17 @@ from pyrsistent import PClass, field -from dask import compute, optimize -from dask.core import toposort, get_dependencies +from dask import compute, optimize, persist + +try: + from dask.distributed import Future +except: + + class Future(object): + pass + + +from dask.core import toposort, get_dependencies, ishashable from . import start_action, current_action, Action @@ -75,6 +84,22 @@ return compute(*optimized, optimize_graph=False) +def persist_with_trace(*args): + """Do Dask persist(), but with added Eliot tracing. + + Known issues: + + 1. Retries will confuse Eliot. Probably need different + distributed-tree mechanism within Eliot to solve that. + """ + # 1. Create top-level Eliot Action: + with start_action(action_type="dask:persist"): + # In order to reduce logging verbosity, add logging to the already + # optimized graph: + optimized = optimize(*args, optimizations=[_add_logging]) + return persist(*optimized, optimize_graph=False) + + def _add_logging(dsk, ignore=None): """ Add logging to a Dask graph. @@ -101,34 +126,43 @@ key_names = {} for key in keys: value = dsk[key] - if not callable(value) and value in keys: + if not callable(value) and ishashable(value) and value in keys: # It's an alias for another key: key_names[key] = key_names[value] else: key_names[key] = simplify(key) - # 2. Create Eliot child Actions for each key, in topological order: - key_to_action_id = {key: str(ctx.serialize_task_id(), "utf-8") for key in keys} + # Values in the graph can be either: + # + # 1. A list of other values. + # 2. A tuple, where first value might be a callable, aka a task. + # 3. A literal of some sort. + def maybe_wrap(key, value): + if isinstance(value, list): + return [maybe_wrap(key, v) for v in value] + elif isinstance(value, tuple): + func = value[0] + args = value[1:] + if not callable(func): + # Not a callable, so nothing to wrap. + return value + wrapped_func = _RunWithEliotContext( + task_id=str(ctx.serialize_task_id(), "utf-8"), + func=func, + key=key_names[key], + dependencies=[key_names[k] for k in get_dependencies(dsk, key)], + ) + return (wrapped_func,) + args + else: + return value - # 3. Replace function with wrapper that logs appropriate Action: + # Replace function with wrapper that logs appropriate Action; iterate in + # topological order so action task levels are in reasonable order. for key in keys: - func = dsk[key][0] - args = dsk[key][1:] - if not callable(func): - # This key is just an alias for another key, no need to add - # logging: - result[key] = dsk[key] - continue - wrapped_func = _RunWithEliotContext( - task_id=key_to_action_id[key], - func=func, - key=key_names[key], - dependencies=[key_names[k] for k in get_dependencies(dsk, key)], - ) - result[key] = (wrapped_func,) + tuple(args) + result[key] = maybe_wrap(key, dsk[key]) assert set(result.keys()) == set(dsk.keys()) return result -__all__ = ["compute_with_trace"] +__all__ = ["compute_with_trace", "persist_with_trace"] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/eliot/tests/test_dask.py new/eliot-1.12.0/eliot/tests/test_dask.py --- old/eliot-1.11.0/eliot/tests/test_dask.py 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/eliot/tests/test_dask.py 2020-01-23 21:52:58.000000000 +0100 @@ -3,15 +3,23 @@ from unittest import TestCase, skipUnless from ..testing import capture_logging, LoggedAction, LoggedMessage -from .. import start_action, Message +from .. import start_action, log_message try: import dask from dask.bag import from_sequence + from dask.distributed import Client + import dask.dataframe as dd + import pandas as pd except ImportError: dask = None else: - from ..dask import compute_with_trace, _RunWithEliotContext, _add_logging + from ..dask import ( + compute_with_trace, + _RunWithEliotContext, + _add_logging, + persist_with_trace, + ) @skipUnless(dask, "Dask not available.") @@ -28,22 +36,66 @@ bag = bag.fold(lambda x, y: x + y) self.assertEqual(dask.compute(bag), compute_with_trace(bag)) + def test_future(self): + """compute_with_trace() can handle Futures.""" + client = Client(processes=False) + self.addCleanup(client.shutdown) + [bag] = dask.persist(from_sequence([1, 2, 3])) + bag = bag.map(lambda x: x * 5) + result = dask.compute(bag) + self.assertEqual(result, ([5, 10, 15],)) + self.assertEqual(result, compute_with_trace(bag)) + + def test_persist_result(self): + """persist_with_trace() runs the same logic as process().""" + client = Client(processes=False) + self.addCleanup(client.shutdown) + bag = from_sequence([1, 2, 3]) + bag = bag.map(lambda x: x * 7) + self.assertEqual( + [b.compute() for b in dask.persist(bag)], + [b.compute() for b in persist_with_trace(bag)], + ) + + def test_persist_pandas(self): + """persist_with_trace() with a Pandas dataframe. + + This ensures we don't blow up, which used to be the case. + """ + df = pd.DataFrame() + df = dd.from_pandas(df, npartitions=1) + persist_with_trace(df) + + @capture_logging(None) + def test_persist_logging(self, logger): + """persist_with_trace() preserves Eliot context.""" + + def persister(bag): + [bag] = persist_with_trace(bag) + return dask.compute(bag) + + self.assert_logging(logger, persister, "dask:persist") + @capture_logging(None) - def test_logging(self, logger): + def test_compute_logging(self, logger): """compute_with_trace() preserves Eliot context.""" + self.assert_logging(logger, compute_with_trace, "dask:compute") + + def assert_logging(self, logger, run_with_trace, top_action_name): + """Utility function for _with_trace() logging tests.""" def mult(x): - Message.log(message_type="mult") + log_message(message_type="mult") return x * 4 def summer(x, y): - Message.log(message_type="finally") + log_message(message_type="finally") return x + y bag = from_sequence([1, 2]) bag = bag.map(mult).fold(summer) with start_action(action_type="act1"): - compute_with_trace(bag) + run_with_trace(bag) [logged_action] = LoggedAction.ofType(logger.messages, "act1") self.assertEqual( @@ -51,7 +103,7 @@ { "act1": [ { - "dask:compute": [ + top_action_name: [ {"eliot:remote_task": ["dask:task", "mult"]}, {"eliot:remote_task": ["dask:task", "mult"]}, {"eliot:remote_task": ["dask:task", "finally"]}, @@ -83,6 +135,8 @@ class AddLoggingTests(TestCase): """Tests for _add_logging().""" + maxDiff = None + def test_add_logging_to_full_graph(self): """_add_logging() recreates Dask graph with wrappers.""" bag = from_sequence([1, 2, 3]) @@ -104,3 +158,52 @@ logging_removed[key] = value self.assertEqual(logging_removed, graph) + + def test_add_logging_explicit(self): + """_add_logging() on more edge cases of the graph.""" + + def add(s): + return s + "s" + + def add2(s): + return s + "s" + + # b runs first, then d, then a and c. + graph = { + "a": "d", + "d": [1, 2, (add, "b")], + ("b", 0): 1, + "c": (add2, "d"), + } + + with start_action(action_type="bleh") as action: + task_id = action.task_uuid + self.assertEqual( + _add_logging(graph), + { + "d": [ + 1, + 2, + ( + _RunWithEliotContext( + task_id=task_id + "@/2", + func=add, + key="d", + dependencies=["b"], + ), + "b", + ), + ], + "a": "d", + ("b", 0): 1, + "c": ( + _RunWithEliotContext( + task_id=task_id + "@/3", + func=add2, + key="c", + dependencies=["d"], + ), + "d", + ), + }, + ) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/eliot-1.11.0/tox.ini new/eliot-1.12.0/tox.ini --- old/eliot-1.11.0/tox.ini 2019-12-07 20:22:41.000000000 +0100 +++ new/eliot-1.12.0/tox.ini 2020-01-23 21:52:58.000000000 +0100 @@ -32,6 +32,9 @@ basepython = python3.7 deps = cffi dask[bag] + dask[distributed] + dask[pandas] + pandas [testenv:py38] basepython = python3.8
