Hello community, here is the log from the commit of package python-streamz for openSUSE:Factory checked in at 2020-10-12 13:58:39 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-streamz (Old) and /work/SRC/openSUSE:Factory/.python-streamz.new.3486 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-streamz" Mon Oct 12 13:58:39 2020 rev:5 rq:840949 version:0.6.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-streamz/python-streamz.changes 2020-08-17 12:03:43.034638654 +0200 +++ /work/SRC/openSUSE:Factory/.python-streamz.new.3486/python-streamz.changes 2020-10-12 13:59:44.570226808 +0200 @@ -1,0 +2,47 @@ +Sat Oct 10 19:21:44 UTC 2020 - Arun Persaud <a...@gmx.de> + +- update to version 0.6.0: + * Fixed outdated docstring + * Merge pull request #362 from jbednar/docfixes + * Merge pull request #363 from jbednar/periodicdataframe + * Update docs/source/dataframes.rst + * Replaced system time with Pandas Timestamp + * Replaced system time with Pandas Timestamp + * Update docs/source/dataframes.rst + * Fixed flakes + * Added test_periodic_dataframes + * Replaced Tornado references with asyncio + * Updated dataframe docs + * Switched callbacks to use kwargs + * Added PeriodicDataFrame to the API docs + * Reformatted .rst files for 80 characters + * Fixed typos and formatting + * Fixed outdated or missing docs conf + * flake + * Added PeriodicDataFrame + * Added plotting section to the user guide + * Fixed typos + +- changes from version 0.5.6: + * Merge pull request #360 from chinmaychandak/windowing_fix + * Add empty windowing index test and fix TODO + * Windowing fix conforming to Pandas + * Style fix + * Windowing over n fix + * Change delta to nanosecond + * Revert npartitions elimination + * Fix windowing over time operations + * Remove print statement + * Remove npartitions parameter for CK -- to do for custreamz.kafka + * Merge pull request #288 from jdye64/287 + * Merge pull request #349 from + nils-braun/bugfix/336-problematic-dicts + * Merge pull request #350 from martindurant/fix_example_notebook + * Update for hvplot + * Use f-strings + * Fix typo + * Fix #336 by wrapping every object with a known key + * Add a test which fails on scattering dicts, as dask treats dicts + differently + +------------------------------------------------------------------- Old: ---- streamz-0.5.5.tar.gz New: ---- streamz-0.6.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-streamz.spec ++++++ --- /var/tmp/diff_new_pack.dnGnx9/_old 2020-10-12 13:59:45.702227279 +0200 +++ /var/tmp/diff_new_pack.dnGnx9/_new 2020-10-12 13:59:45.702227279 +0200 @@ -19,7 +19,7 @@ %{?!python_module:%define python_module() python-%{**} python3-%{**}} %define skip_python2 1 Name: python-streamz -Version: 0.5.5 +Version: 0.6.0 Release: 0 Summary: Tool to build continuous data pipelines License: BSD-3-Clause ++++++ streamz-0.5.5.tar.gz -> streamz-0.6.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/PKG-INFO new/streamz-0.6.0/PKG-INFO --- old/streamz-0.5.5/PKG-INFO 2020-08-11 15:36:04.000000000 +0200 +++ new/streamz-0.6.0/PKG-INFO 2020-09-26 03:08:42.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: streamz -Version: 0.5.5 +Version: 0.6.0 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/collections-api.rst new/streamz-0.6.0/docs/source/collections-api.rst --- old/streamz-0.5.5/docs/source/collections-api.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/collections-api.rst 2020-09-25 16:18:17.000000000 +0200 @@ -87,6 +87,9 @@ Rolling.var .. autosummary:: + PeriodicDataFrame + +.. autosummary:: Random Details diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/conf.py new/streamz-0.6.0/docs/source/conf.py --- old/streamz-0.5.5/docs/source/conf.py 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/conf.py 2020-09-25 16:18:17.000000000 +0200 @@ -48,7 +48,7 @@ # General information about the project. project = 'Streamz' -copyright = '2017, Matthew Rocklin' +copyright = '2017-2020, Matthew Rocklin' author = 'Matthew Rocklin' # The version info for the project you're documenting, acts as replacement for @@ -160,7 +160,7 @@ # dir menu entry, description, category) texinfo_documents = [ (master_doc, 'Streamz', 'Streamz Documentation', - author, 'Streamz', 'One line description of project.', + author, 'Streamz', 'Support for pipelines managing continuous streams of data.', 'Miscellaneous'), ] diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/core.rst new/streamz-0.6.0/docs/source/core.rst --- old/streamz-0.5.5/docs/source/core.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/core.rst 2020-09-25 16:18:17.000000000 +0200 @@ -15,7 +15,9 @@ map sink -You can create a basic pipeline by instantiating the ``Streamz`` object and then using methods like ``map``, ``accumulate``, and ``sink``. +You can create a basic pipeline by instantiating the ``Streamz`` +object and then using methods like ``map``, ``accumulate``, and +``sink``. .. code-block:: python @@ -27,7 +29,10 @@ source = Stream() source.map(increment).sink(print) -The ``map`` and ``sink`` methods both take a function and apply that function to every element in the stream. The ``map`` method returns a new stream with the modified elements while ``sink`` is typically used at the end of a stream for final actions. +The ``map`` and ``sink`` methods both take a function and apply that +function to every element in the stream. The ``map`` method returns a +new stream with the modified elements while ``sink`` is typically used +at the end of a stream for final actions. To push data through our pipeline we call ``emit`` @@ -383,14 +388,33 @@ Metadata -------- -Metadata can be emitted into the pipeline to accompany the data as a list of dictionaries. Most functions will pass the metadata to the downstream function without making any changes. However, functions that make the pipeline asynchronous require logic that dictates how and when the metadata will be passed downstream. Synchronous functions and asynchronous functions that have a 1:1 ratio of the number of values on the input to the number of values on the output will emit the metadata collection without any modification. However, functions that have multiple input streams or emit collections of data will emit the metadata associated with the emitted data as a collection. +Metadata can be emitted into the pipeline to accompany the data as a +list of dictionaries. Most functions will pass the metadata to the +downstream function without making any changes. However, functions +that make the pipeline asynchronous require logic that dictates how +and when the metadata will be passed downstream. Synchronous functions +and asynchronous functions that have a 1:1 ratio of the number of +values on the input to the number of values on the output will emit +the metadata collection without any modification. However, functions +that have multiple input streams or emit collections of data will emit +the metadata associated with the emitted data as a collection. Reference Counting and Checkpointing ------------------------------------ -Checkpointing is achieved in Streamz through the use of reference counting. With this method, a checkpoint can be saved when and only when data has progressed through all of the the pipeline without any issues. This prevents data loss and guarantees at-least-once semantics. - -Any node that caches or holds data after it returns increments the reference counter associated with the given data by one. When a node is no longer holding the data, it will release it by decrementing the counter by one. When the counter changes to zero, a callback associated with the data is triggered. - -References are passed in the metadata as a value of the `ref` keyword. Each metadata object contains only one reference counter object. \ No newline at end of file +Checkpointing is achieved in Streamz through the use of reference +counting. With this method, a checkpoint can be saved when and only +when data has progressed through all of the the pipeline without any +issues. This prevents data loss and guarantees at-least-once +semantics. + +Any node that caches or holds data after it returns increments the +reference counter associated with the given data by one. When a node +is no longer holding the data, it will release it by decrementing the +counter by one. When the counter changes to zero, a callback +associated with the data is triggered. + +References are passed in the metadata as a value of the `ref` +keyword. Each metadata object contains only one reference counter +object. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/dask.rst new/streamz-0.6.0/docs/source/dask.rst --- old/streamz-0.5.5/docs/source/dask.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/dask.rst 2020-09-25 16:18:17.000000000 +0200 @@ -36,7 +36,7 @@ from dask.distributed import Client client = Client() -This operates on a local processes or threads. If you have Bokeh installed +This operates on local processes or threads. If you have Bokeh installed then this will also start a diagnostics web server at http://localhost:8787/status which you may want to open to get a real-time view of execution. @@ -49,7 +49,7 @@ map sink -Before we build a parallel stream, lets build a sequential stream that maps a +Before we build a parallel stream, let's build a sequential stream that maps a simple function across data, and then prints those results. We use the core ``Stream`` object. @@ -69,7 +69,7 @@ for i in range(10): source.emit(i) -This should take ten seconds we call the ``inc`` function ten times +This should take ten seconds because we call the ``inc`` function ten times sequentially. Parallel Execution @@ -101,7 +101,7 @@ sense of the parallel execution. This should have run much more quickly depending on how many cores you have on -your machine. We added a few extra nodes to our stream, lets look at what they +your machine. We added a few extra nodes to our stream; let's look at what they did. - ``scatter``: Converted our Stream into a DaskStream. The elements that we @@ -123,17 +123,20 @@ +++++++ -An important gotcha with ``DaskStream`` is that it is a subclass ``Stream``, and so can be used as an input -to any function expecting a ``Stream``. If there is no intervening ``.gather()``, then the downstream node will -receive Dask futures instead of the data they represent:: +An important gotcha with ``DaskStream`` is that it is a subclass of +``Stream``, and so can be used as an input to any function expecting a +``Stream``. If there is no intervening ``.gather()``, then the +downstream node will receive Dask futures instead of the data they +represent:: source = Stream() source2 = Stream() a = source.scatter().map(inc) b = source2.combine_latest(a) -In this case, the combine operation will get real values from ``source2``, and Dask futures. -Downstream nodes would be free to operate on the futures, but more likely, the line should be:: +In this case, the combine operation will get real values from +``source2``, and Dask futures. Downstream nodes would be free to +operate on the futures, but more likely, the line should be:: b = source2.combine_latest(a.gather()) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/dataframes.rst new/streamz-0.6.0/docs/source/dataframes.rst --- old/streamz-0.5.5/docs/source/dataframes.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/dataframes.rst 2020-09-25 16:18:17.000000000 +0200 @@ -4,7 +4,7 @@ When handling large volumes of streaming tabular data it is often more efficient to pass around larger Pandas dataframes with many rows each rather than pass around individual Python tuples or dicts. Handling and computing on -data with Pandas can be much faster than operating on Python objects. +data with Pandas can be much faster than operating on individual Python objects. So one could imagine building streaming dataframe pipelines using the ``.map`` and ``.accumulate`` streaming operators with functions that consume and produce @@ -178,5 +178,79 @@ Not Yet Supported ----------------- -Streaming dataframes algorithms do not currently pay special attention to data +Streaming dataframe algorithms do not currently pay special attention to data arriving out-of-order. + + +PeriodicDataFrame +----------------- + +As you have seen above, Streamz can handle arbitrarily complex pipelines, +events, and topologies, but what if you simply want to run some Python +function periodically and collect or plot the results? + +streamz provides a high-level convenience class for this purpose, called +a PeriodicDataFrame. A PeriodicDataFrame uses Python's asyncio event loop +(used as part of Tornado in Jupyter and other interactive frameworks) to +call a user-provided function at a regular interval, collecting the results +and making them available for later processing. + +In the simplest case, you can use a PeriodicDataFrame by first writing +a callback function like: + +.. code-block:: python + + import numpy as np + + def random_datapoint(**kwargs): + return pd.DataFrame({'a': np.random.random(1)}, index=[pd.Timestamp.now()]) + +You can then make a streaming dataframe to poll this function +e.g. every 300 milliseconds: + +.. code-block:: python + + df = PeriodicDataFrame(random_datapoint, interval='300ms') + +``df`` will now be a steady stream of whatever values are returned by +the `datafn`, which can of course be any Python code as long as it +returns a DataFrame. + +Here we returned only a single point, appropriate for streaming the +results of system calls or other isolated actions, but any number of +entries can be returned by the dataframe in a single batch. To +facilitate collecting such batches, the callback is invoked with +keyword arguments ``last`` (the time of the previous invocation) and +``now`` (the time of the current invocation) as Pandas Timestamp +objects. The callback can then generate or query for just the values +in that time range. + +Arbitrary keyword arguments can be provided to the PeriodicDataFrame +constructor, which will be passed into the callback so that its behavior +can be parameterized. + +For instance, you can write a callback to return a suitable number of +datapoints to keep a regularly updating stream, generated randomly +as a batch since the last call: + +.. code-block:: python + + def datablock(last, now, **kwargs): + freq = kwargs.get("freq", pd.Timedelta("50ms")) + index = pd.date_range(start=last + freq, end=now, freq=freq) + return pd.DataFrame({'x': np.random.random(len(index))}, index=index) + + df = PeriodicDataFrame(datablock, interval='300ms') + +The callback will now be invoked every 300ms, each time generating +datapoints at a rate of 1 every 50ms, returned as a batch. If you +wished, you could override the 50ms value by passing +`freq=pd.Timedelta("100ms")` to the PeriodicDataFrame constructor. + +Similar code could e.g. query an external database for the time range +since the last update, returning all datapoints since then. + +Once you have a PeriodicDataFrame defined using such callbacks, you +can then use all the rest of the functionality supported by streamz, +including aggregations, rolling windows, etc., and streaming +`visualization. <plotting>`_ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/gpu-dataframes.rst new/streamz-0.6.0/docs/source/gpu-dataframes.rst --- old/streamz-0.5.5/docs/source/gpu-dataframes.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/gpu-dataframes.rst 2020-09-25 16:18:17.000000000 +0200 @@ -1,13 +1,15 @@ -Streaming GPU DataFrames(cudf) ------------------------------- +Streaming GPU DataFrames (cudf) +------------------------------- -The ``streamz.dataframe`` module provides DataFrame-like interface on streaming -data as described in ``dataframes`` documentation. It provides support for dataframe -like libraries such as pandas and cudf. This documentation is specific to streaming GPU -dataframes(cudf). - -The example in the ``dataframes`` documentation is rewritten below using cudf dataframes -just by replacing ``pandas`` module with ``cudf``: +The ``streamz.dataframe`` module provides a DataFrame-like interface +on streaming data as described in the ``dataframes`` documentation. It +provides support for dataframe-like libraries such as pandas and +cudf. This documentation is specific to streaming GPU dataframes using +cudf. + +The example in the ``dataframes`` documentation is rewritten below +using cudf dataframes just by replacing the ``pandas`` module with +``cudf``: .. code-block:: python @@ -23,7 +25,7 @@ Supported Operations -------------------- -Streaming cudf dataframes support the following classes of operations +Streaming cudf dataframes support the following classes of operations: - Elementwise operations like ``df.x + 1`` - Filtering like ``df[df.name == 'Alice']`` @@ -31,11 +33,13 @@ - Reductions like ``df.amount.mean()`` - Windowed aggregations (fixed length) like ``df.window(n=100).amount.sum()`` -The following operations are not supported with cudf(as of version 0.8) yet +The following operations are not yet supported with cudf (as of version 0.8): + - Groupby-aggregations like ``df.groupby(df.name).amount.mean()`` - Windowed aggregations (index valued) like ``df.window(value='2h').amount.sum()`` - Windowed groupby aggregations like ``df.window(value='2h').groupby('name').amount.sum()`` -Window based Aggregations with cudf are supported just as explained in ``dataframes`` documentation. -The support for groupby operations will be added in future. +Window-based Aggregations with cudf are supported just as explained in +the ``dataframes`` documentation. Support for groupby operations is +expected to be added in the future. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/index.rst new/streamz-0.6.0/docs/source/index.rst --- old/streamz-0.5.5/docs/source/index.rst 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/docs/source/index.rst 2020-09-25 16:18:17.000000000 +0200 @@ -74,6 +74,26 @@ - pip: ``pip install streamz`` - dev: ``git clone https://github.com/python-streamz/streamz`` followed by ``pip install -e streamz/`` +Quickstart +---------- + +The streamz project offers a Docker image for the convenience of quickly trying out streamz and its features. +The purpose of the Dockerfile at this time is not to be used in a production +environment but rather for experimentation, learning, or new feature development. + +Its most common use would be to interact with the streamz example jupyter notebooks. Lets walk through the steps needed for this. + +- Build the Docker container +.. code-block:: bash +$ docker/build.sh +- Run the Docker container +.. code-block:: bash +$ docker/run.sh +- Interact with Jupyter Lab on the container in your browser at `JUPYTER_LAB`_. +.. JUPYTER_LAB: http://localhost:8888/ + + + Related Work ------------ @@ -91,8 +111,10 @@ core.rst dataframes.rst + gpu-dataframes.rst dask.rst collections.rst api.rst collections-api.rst async.rst + plotting.rst diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/docs/source/plotting.rst new/streamz-0.6.0/docs/source/plotting.rst --- old/streamz-0.5.5/docs/source/plotting.rst 1970-01-01 01:00:00.000000000 +0100 +++ new/streamz-0.6.0/docs/source/plotting.rst 2020-09-25 16:18:17.000000000 +0200 @@ -0,0 +1,61 @@ +Visualizing streamz +=================== + +A variety of tools are available to help you understand, debug, +visualize your streaming objects: + +- Most Streamz objects automatically display themselves in Jupyter + notebooks, periodically updating their visual representation as text + or tables by registering events with the Tornado IOLoop used by Jupyter +- The network graph underlying a stream can be visualized using `dot` to + render a PNG using `Stream.visualize(filename)` +- Streaming data can be visualized using the optional separate packages + hvPlot, HoloViews, and Panel (see below) + + +hvplot.streamz +-------------- + +hvPlot is a separate plotting library providing Bokeh-based plots for +Pandas dataframes and a variety of other object types, including +streamz DataFrame and Series objects. + +See `hvplot.holoviz.org <https://hvplot.holoviz.org>`_ for +instructions on how to install hvplot. Once it is installed, you can +use the Pandas .plot() API to get a dynamically updating plot in +Jupyter or in Bokeh/Panel Server: + +.. code-block:: python + + import hvplot.streamz + from streamz.dataframe import Random + + df = Random() + df.hvplot(backlog=100) + +See the `streaming section +<https://hvplot.holoviz.org/user_guide/Streaming.html>`_ of the hvPlot +user guide for more details, and the `dataframes.ipynb` example that +comes with streamz for a simple runnable example. + + +HoloViews +--------- + +hvPlot is built on HoloViews, and you can also use HoloViews directly +if you want more control over events and how they are processed. See +the `HoloViews user guide +<http://holoviews.org/user_guide/Streaming_Data.html>`_ for more +details. + + +Panel +----- + +Panel is a general purpose dashboard and app framework, supporting a +wide variety of displayable objects as "Panes". Panel provides a +`streamz Pane +<https://panel.holoviz.org/reference/panes/Streamz.html>`_ for +rendering arbitrary streamz objects, and streamz DataFrames are +handled by the Panel `DataFrame Pane +<https://panel.holoviz.org/reference/panes/DataFrame.html>`_. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/examples/dataframes.ipynb new/streamz-0.6.0/examples/dataframes.ipynb --- old/streamz-0.5.5/examples/dataframes.ipynb 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/examples/dataframes.ipynb 2020-08-12 22:45:28.000000000 +0200 @@ -6,7 +6,9 @@ "metadata": {}, "outputs": [], "source": [ - "from streamz.dataframe import Random, DataFrame" + "from streamz.dataframe import Random, DataFrame\n", + "# this example requires hvplot\n", + "import hvplot.streamz" ] }, { @@ -38,8 +40,9 @@ "p = (DataFrame({'raw': sdf.x,\n", " 'smooth': sdf.x.rolling('100ms').mean(),\n", " 'very-smooth': sdf.x.rolling('500ms').mean()})\n", - " .plot(width=700)\n", - " )" + " .hvplot(width=700)\n", + " )\n", + "p" ] }, { @@ -66,7 +69,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.3" + "version": "3.7.6" } }, "nbformat": 4, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/setup.py new/streamz-0.6.0/setup.py --- old/streamz-0.5.5/setup.py 2020-08-11 15:34:48.000000000 +0200 +++ new/streamz-0.6.0/setup.py 2020-09-26 03:07:06.000000000 +0200 @@ -9,7 +9,7 @@ setup(name='streamz', - version='0.5.5', + version='0.6.0', description='Streams', url='http://github.com/python-streamz/streamz/', maintainer='Matthew Rocklin', diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/__init__.py new/streamz-0.6.0/streamz/__init__.py --- old/streamz-0.5.5/streamz/__init__.py 2020-08-11 15:34:04.000000000 +0200 +++ new/streamz-0.6.0/streamz/__init__.py 2020-09-26 03:07:06.000000000 +0200 @@ -8,4 +8,4 @@ except ImportError: pass -__version__ = '0.5.5' +__version__ = '0.6.0' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/dask.py new/streamz-0.6.0/streamz/dask.py --- old/streamz-0.5.5/streamz/dask.py 2020-07-10 19:59:06.000000000 +0200 +++ new/streamz-0.6.0/streamz/dask.py 2020-08-27 19:47:33.000000000 +0200 @@ -5,6 +5,7 @@ from tornado import gen from dask.compatibility import apply +from dask.base import tokenize from distributed.client import default_client from .core import Stream @@ -103,7 +104,14 @@ client = default_client() self._retain_refs(metadata) - future = yield client.scatter(x, asynchronous=True) + # We need to make sure that x is treated as it is by dask + # However, client.scatter works internally different for + # lists and dicts. So we always use a dict here to be sure + # we know the format exactly. The key will be taken as the + # dask identifier of the data. + tokenized_x = f"{type(x).__name__}-{tokenize(x)}" + future_as_dict = yield client.scatter({tokenized_x: x}, asynchronous=True) + future = future_as_dict[tokenized_x] f = yield self._emit(future, metadata=metadata) self._release_refs(metadata) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/dataframe/__init__.py new/streamz-0.6.0/streamz/dataframe/__init__.py --- old/streamz-0.5.5/streamz/dataframe/__init__.py 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/streamz/dataframe/__init__.py 2020-09-25 16:18:17.000000000 +0200 @@ -1,3 +1,3 @@ from .core import (DataFrame, DataFrames, Frame, Frames, Series, Seriess, Index, - Rolling, Window, Random, GroupBy) + Rolling, Window, PeriodicDataFrame, Random, GroupBy) from .aggregations import Aggregation diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/dataframe/aggregations.py new/streamz-0.6.0/streamz/dataframe/aggregations.py --- old/streamz-0.5.5/streamz/dataframe/aggregations.py 2020-07-10 19:59:06.000000000 +0200 +++ new/streamz-0.6.0/streamz/dataframe/aggregations.py 2020-09-09 19:46:54.000000000 +0200 @@ -167,18 +167,20 @@ List of dataframes to decay """ dfs = deque(dfs) - dfs.append(new) + if len(new) > 0: + dfs.append(new) old = [] - n = sum(map(len, dfs)) - window - while n > 0: - if len(dfs[0]) <= n: - df = dfs.popleft() - old.append(df) - n -= len(df) - else: - old.append(dfs[0].iloc[:n]) - dfs[0] = dfs[0].iloc[n:] - n = 0 + if len(dfs) > 0: + n = sum(map(len, dfs)) - window + while n > 0: + if len(dfs[0]) <= n: + df = dfs.popleft() + old.append(df) + n -= len(df) + else: + old.append(dfs[0].iloc[:n]) + dfs[0] = dfs[0].iloc[n:] + n = 0 return dfs, old @@ -202,16 +204,21 @@ List of dataframes to decay """ dfs = deque(dfs) - dfs.append(new) - mx = max(df.index.max() for df in dfs) - mn = mx - pd.Timedelta(window) + if len(new) > 0: + dfs.append(new) old = [] - while pd.Timestamp(dfs[0].index.min()) < mn: - o = dfs[0].loc[:mn] - old.append(o) # TODO: avoid copy if fully lost - dfs[0] = dfs[0].iloc[len(o):] - if not len(dfs[0]): - dfs.popleft() + if len(dfs) > 0: + mx = max(df.index.max() for df in dfs) + mn = mx - pd.Timedelta(window) + pd.Timedelta('1ns') + while pd.Timestamp(dfs[0].index.min()) < mn: + o = dfs[0].loc[:mn] + if len(old) > 0: + old.append(o) + else: + old = [o] + dfs[0] = dfs[0].iloc[len(o):] + if not len(dfs[0]): + dfs.popleft() return dfs, old @@ -337,7 +344,8 @@ if 'groupers' in acc: groupers = deque(acc['groupers']) - groupers.append(grouper) + if len(grouper) > 0: + groupers.append(grouper) old_groupers, groupers = diff_align(dfs, groupers) else: old_groupers = [grouper] * len(old) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/dataframe/core.py new/streamz-0.6.0/streamz/dataframe/core.py --- old/streamz-0.5.5/streamz/dataframe/core.py 2020-07-10 19:59:06.000000000 +0200 +++ new/streamz-0.6.0/streamz/dataframe/core.py 2020-09-25 16:18:17.000000000 +0200 @@ -2,7 +2,6 @@ import operator from collections import OrderedDict -from time import time import numpy as np import pandas as pd import toolz @@ -802,10 +801,35 @@ return Streaming(outstream, example, stream_type=stream_type) -def _random_df(tup): - last, now, freq = tup - index = pd.date_range(start=(last + freq.total_seconds()) * 1e9, - end=now * 1e9, freq=freq) +def random_datapoint(now, **kwargs): + """Example of querying a single current value""" + return pd.DataFrame( + {'a': np.random.random(1)}, index=[now]) + + +def random_datablock(last, now, **kwargs): + """ + Example of querying over a time range since last update + + Parameters + ---------- + last: pd.Timestamp + Time of previous call to this function. + now: pd.Timestamp + Current time. + freq: pd.Timedelta, optional + The time interval between individual records to be returned. + For good throughput, should be much smaller than the + interval at which this function is called. + + Returns a pd.DataFrame with random values where: + + The x column is uniformly distributed. + The y column is Poisson distributed. + The z column is normally distributed. + """ + freq = kwargs.get("freq", pd.Timedelta("100ms")) + index = pd.date_range(start=last + freq, end=now, freq=freq) df = pd.DataFrame({'x': np.random.random(len(index)), 'y': np.random.poisson(size=len(index)), @@ -814,29 +838,32 @@ return df -class Random(DataFrame): - """ A streaming dataframe of random data - - The x column is uniformly distributed. - The y column is poisson distributed. - The z column is normally distributed. - - This class is experimental and will likely be removed in the future +class PeriodicDataFrame(DataFrame): + """A streaming dataframe using the asyncio ioloop to poll a callback fn Parameters ---------- - freq: timedelta - The time interval between records + datafn: callable + Callback function accepting **kwargs and returning a + pd.DataFrame. kwargs will include at least + 'last' (pd.Timestamp.now() when datafn was last invoked), and + 'now' (current pd.Timestamp.now()). interval: timedelta - The time interval between new dataframes, should be significantly - larger than freq + The time interval between new dataframes. + dask: boolean + If true, uses a DaskStream instead of a regular Source. + **kwargs: + Optional keyword arguments to be passed into the callback function. + + By default, returns a three-column random pd.DataFrame generated + by the 'random_datablock' function. Example ------- - >>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP + >>> df = PeriodicDataFrame(interval='1s', datafn=random_datapoint) # doctest: +SKIP """ - def __init__(self, freq='100ms', interval='500ms', dask=False): + def __init__(self, datafn=random_datablock, interval='500ms', dask=False, **kwargs): if dask: from streamz.dask import DaskStream source = DaskStream() @@ -844,17 +871,17 @@ else: source = Source() loop = IOLoop.current() - self.freq = pd.Timedelta(freq) self.interval = pd.Timedelta(interval).total_seconds() self.source = source self.continue_ = [True] + self.kwargs = kwargs - stream = self.source.map(_random_df) - example = _random_df((time(), time(), self.freq)) + stream = self.source.map(lambda x: datafn(**x, **kwargs)) + example = datafn(last=pd.Timestamp.now(), now=pd.Timestamp.now(), **kwargs) - super(Random, self).__init__(stream, example) + super(PeriodicDataFrame, self).__init__(stream, example) - loop.add_callback(self._cb, self.interval, self.freq, self.source, + loop.add_callback(self._cb, self.interval, self.source, self.continue_) def __del__(self): @@ -865,15 +892,34 @@ @staticmethod @gen.coroutine - def _cb(interval, freq, source, continue_): - last = time() + def _cb(interval, source, continue_): + last = pd.Timestamp.now() while continue_[0]: yield gen.sleep(interval) - now = time() - yield source._emit((last, now, freq)) + now = pd.Timestamp.now() + yield source._emit(dict(last=last, now=now)) last = now +class Random(PeriodicDataFrame): + """PeriodicDataFrame providing random values by default + + Accepts same parameters as PeriodicDataFrame, plus + `freq`, a string that will be converted to a pd.Timedelta + and passed to the 'datafn'. + + Useful mainly for examples and docs. + + Example + ------- + >>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP + """ + + def __init__(self, freq='100ms', interval='500ms', dask=False, + datafn=random_datablock): + super(Random, self).__init__(datafn, interval, dask, freq=pd.Timedelta(freq)) + + _stream_types['streaming'].append((is_dataframe_like, DataFrame)) _stream_types['streaming'].append((is_index_like, Index)) _stream_types['streaming'].append((is_series_like, Series)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/dataframe/tests/test_dataframes.py new/streamz-0.6.0/streamz/dataframe/tests/test_dataframes.py --- old/streamz-0.5.5/streamz/dataframe/tests/test_dataframes.py 2020-08-11 15:34:07.000000000 +0200 +++ new/streamz-0.6.0/streamz/dataframe/tests/test_dataframes.py 2020-09-09 19:46:54.000000000 +0200 @@ -755,12 +755,11 @@ assert len(L) == 5 first = df.iloc[:diff] - lost = first[first.index.min() + value:] - first = first.iloc[len(lost):] + first = first[first.index.max() - value + pd.Timedelta('1ns'):] assert_eq(L[0], f(first)) - last = df.loc[index.max() - value + pd.Timedelta('1s'):] + last = df.loc[index.max() - value + pd.Timedelta('1ns'):] assert_eq(L[-1], f(last)) @@ -815,6 +814,42 @@ assert_eq(L[-1], f(last)) +def test_windowing_value_empty_intermediate_index(stream): + def preprocess(df): + mask = df["amount"] == 5 + df = df.loc[mask] + return df + + source = stream.map(preprocess) + + example = pd.DataFrame({"amount":[]}) + sdf = DataFrame(stream=source, example=example) + + output = sdf.window("2h").amount.sum().stream.gather().sink_to_list() + + stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 00:00:00"), + pd.Timestamp("2050-01-01 01:00:00"), + pd.Timestamp("2050-01-01 02:00:00")])) + + stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 03:00:00"), + pd.Timestamp("2050-01-01 04:00:00"), + pd.Timestamp("2050-01-01 05:00:00")])) + + stream.emit(pd.DataFrame({"amount": [4, 5, 6]}, index=[pd.Timestamp("2050-01-01 06:00:00"), + pd.Timestamp("2050-01-01 07:00:00"), + pd.Timestamp("2050-01-01 08:00:00")])) + + stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 09:00:00"), + pd.Timestamp("2050-01-01 10:00:00"), + pd.Timestamp("2050-01-01 11:00:00")])) + + stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 12:00:00"), + pd.Timestamp("2050-01-01 13:00:00"), + pd.Timestamp("2050-01-01 14:00:00")])) + + assert_eq(output, [0, 10, 5, 5, 10]) + + def test_window_full(): df = pd.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5}) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/tests/test_batch.py new/streamz-0.6.0/streamz/tests/test_batch.py --- old/streamz-0.5.5/streamz/tests/test_batch.py 2020-07-10 19:59:01.000000000 +0200 +++ new/streamz-0.6.0/streamz/tests/test_batch.py 2020-09-25 16:18:17.000000000 +0200 @@ -37,6 +37,21 @@ assert result.z.tolist() == [3 * i for i in range(10)] +def test_periodic_dataframes(): + pd = pytest.importorskip('pandas') + from streamz.dataframe import PeriodicDataFrame + from streamz.dataframe.core import random_datapoint + df = random_datapoint(now=pd.Timestamp.now()) + assert len(df) == 1 + + def callback(now, **kwargs): + return pd.DataFrame(dict(x=50, index=[now])) + + df = PeriodicDataFrame(callback, interval='20ms') + assert df.tail(0).x == 50 + df.stop() + + def test_filter(): a = Batch() f = a.filter(lambda x: x % 2 == 0) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz/tests/test_dask.py new/streamz-0.6.0/streamz/tests/test_dask.py --- old/streamz-0.5.5/streamz/tests/test_dask.py 2020-07-10 19:59:06.000000000 +0200 +++ new/streamz-0.6.0/streamz/tests/test_dask.py 2020-08-27 19:47:33.000000000 +0200 @@ -29,6 +29,28 @@ @gen_cluster(client=True) +def test_map_on_dict(c, s, a, b): + # dask treats dicts differently, so we have to make sure + # the user sees no difference in the streamz api. + # Regression test against #336 + def add_to_dict(d): + d["x"] = d["i"] + return d + + source = Stream(asynchronous=True) + futures = source.scatter().map(add_to_dict) + L = futures.gather().sink_to_list() + + for i in range(5): + yield source.emit({"i": i}) + + assert len(L) == 5 + for i, item in enumerate(sorted(L, key=lambda x: x["x"])): + assert item["x"] == i + assert item["i"] == i + + +@gen_cluster(client=True) def test_scan(c, s, a, b): source = Stream(asynchronous=True) futures = scatter(source).map(inc).scan(add) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz.egg-info/PKG-INFO new/streamz-0.6.0/streamz.egg-info/PKG-INFO --- old/streamz-0.5.5/streamz.egg-info/PKG-INFO 2020-08-11 15:36:04.000000000 +0200 +++ new/streamz-0.6.0/streamz.egg-info/PKG-INFO 2020-09-26 03:08:41.000000000 +0200 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: streamz -Version: 0.5.5 +Version: 0.6.0 Summary: Streams Home-page: http://github.com/python-streamz/streamz/ Maintainer: Matthew Rocklin diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/streamz-0.5.5/streamz.egg-info/SOURCES.txt new/streamz-0.6.0/streamz.egg-info/SOURCES.txt --- old/streamz-0.5.5/streamz.egg-info/SOURCES.txt 2020-08-11 15:36:04.000000000 +0200 +++ new/streamz-0.6.0/streamz.egg-info/SOURCES.txt 2020-09-26 03:08:41.000000000 +0200 @@ -25,6 +25,7 @@ docs/source/dataframes.rst docs/source/gpu-dataframes.rst docs/source/index.rst +docs/source/plotting.rst docs/source/images/complex.svg docs/source/images/cyclic.svg docs/source/images/inc-dec-add-print.svg