Hello community,
here is the log from the commit of package python-celery-batches for
openSUSE:Leap:15.2 checked in at 2020-04-02 16:48:19
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Leap:15.2/python-celery-batches (Old)
and /work/SRC/openSUSE:Leap:15.2/.python-celery-batches.new.3248 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-celery-batches"
Thu Apr 2 16:48:19 2020 rev:4 rq:790108 version:0.3
Changes:
--------
---
/work/SRC/openSUSE:Leap:15.2/python-celery-batches/python-celery-batches.changes
2020-03-16 12:21:09.107709434 +0100
+++
/work/SRC/openSUSE:Leap:15.2/.python-celery-batches.new.3248/python-celery-batches.changes
2020-04-02 16:48:20.889926887 +0200
@@ -1,0 +2,11 @@
+Mon Mar 30 13:47:17 UTC 2020 - Marketa Calabkova <[email protected]>
+
+- Update to version 0.3
+ * Properly set the ``current_task`` when running Batch tasks.
+ * Call the success signal after a successful run of the Batch task.
+ * Support running tasks eagerly via the ``Task.apply()`` method. This causes
+ the task to execute with a batch of a single item.
+ * Officially support Python 3.7 and 3.8. Drop support for Python 3.4.
+ * Officially support Celery 4.3 and 4.4.
+
+-------------------------------------------------------------------
Old:
----
celery-batches-0.2.tar.gz
New:
----
celery-batches-0.3.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-celery-batches.spec ++++++
--- /var/tmp/diff_new_pack.8cy7V8/_old 2020-04-02 16:48:21.293928477 +0200
+++ /var/tmp/diff_new_pack.8cy7V8/_new 2020-04-02 16:48:21.293928477 +0200
@@ -1,7 +1,7 @@
#
# spec file for package python-celery-batches
#
-# Copyright (c) 2019 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2020 SUSE LLC
# Copyright (c) 2018 Matthias Fehring <[email protected]>
#
# All modifications and additions to the file contributed by third parties
@@ -20,7 +20,7 @@
%{?!python_module:%define python_module() python-%{**} python3-%{**}}
%define _pkgname celery-batches
Name: python-%{_pkgname}
-Version: 0.2
+Version: 0.3
Release: 0
Summary: Django module to process multiple Celery task requests together
License: BSD-3-Clause
++++++ celery-batches-0.2.tar.gz -> celery-batches-0.3.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/.gitignore
new/celery-batches-0.3/.gitignore
--- old/celery-batches-0.2/.gitignore 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/.gitignore 2020-01-30 00:19:01.000000000 +0100
@@ -14,3 +14,6 @@
# Coverage related.
.coverage
htmlcov
+
+# Editor related.
+.idea
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/.travis.yml
new/celery-batches-0.3/.travis.yml
--- old/celery-batches-0.2/.travis.yml 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/.travis.yml 2020-01-30 00:19:01.000000000 +0100
@@ -1,35 +1,64 @@
language: python
-sudo: required
-dist: trusty
+dist: bionic
cache: pip
python:
- '2.7'
- - '3.4'
- '3.5'
- '3.6'
+ - '3.7'
+ - '3.8'
- 'pypy'
- 'pypy3'
os:
- linux
+stages:
+ - lint
+ - test
env:
- CELERY_VERSION=40
- CELERY_VERSION=41
+ - CELERY_VERSION=42
+ - CELERY_VERSION=43
+ - CELERY_VERSION=44
- CELERY_VERSION=master
+matrix:
+ include:
+ - python: '3.6'
+ env: TOXENV=flake8
+ stage: lint
+ # Celery 4.3 added support for Python >= 3.7.
+ exclude:
+ - python: '3.7'
+ env: CELERY_VERSION=40
+ - python: '3.7'
+ env: CELERY_VERSION=41
+ - python: '3.7'
+ env: CELERY_VERSION=42
+ - python: '3.8'
+ env: CELERY_VERSION=40
+ - python: '3.8'
+ env: CELERY_VERSION=41
+ - python: '3.8'
+ env: CELERY_VERSION=42
+ allow_failures:
+ - env: CELERY_VERSION=master
+ - python: pypy
before_install:
- - export TOXENV=${TRAVIS_PYTHON_VERSION}-celery${CELERY_VERSION}
- - |
- if [[ "$TOXENV" =~ "pypy" ]]; then
- export PYENV_ROOT="$HOME/.pyenv"
- if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
- cd "$PYENV_ROOT" && git pull
- else
- rm -rf "$PYENV_ROOT" && git clone --depth 1
https://github.com/pyenv/pyenv.git "$PYENV_ROOT"
- fi
- "$PYENV_ROOT/bin/pyenv" install "$PYPY_VERSION"
- virtualenv
--python="$PYENV_ROOT/versions/$PYPY_VERSION/bin/python"
"$HOME/virtualenvs/$PYPY_VERSION"
- source "$HOME/virtualenvs/$PYPY_VERSION/bin/activate"
- which python
+ # If TOXENV is not set, build it from the Python and Celery versions.
+ - if [[ -v CELERY_VERSION ]]; then export
TOXENV=${TRAVIS_PYTHON_VERSION}-celery${CELERY_VERSION}; fi; env
+ - |
+ if [[ "$TOXENV" =~ "pypy" ]]; then
+ export PYENV_ROOT="$HOME/.pyenv"
+ if [ -f "$PYENV_ROOT/bin/pyenv" ]; then
+ cd "$PYENV_ROOT" && git pull
+ else
+ rm -rf "$PYENV_ROOT" && git clone --depth 1
https://github.com/pyenv/pyenv.git "$PYENV_ROOT"
fi
+ "$PYENV_ROOT/bin/pyenv" install "$PYPY_VERSION"
+ virtualenv --python="$PYENV_ROOT/versions/$PYPY_VERSION/bin/python"
"$HOME/virtualenvs/$PYPY_VERSION"
+ source "$HOME/virtualenvs/$PYPY_VERSION/bin/activate"
+ which python
+ fi
after_success:
- |
if [[ -v MATRIX_TOXENV || "$TOXENV" =~ "pypy" ]]; then
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/CHANGELOG.rst
new/celery-batches-0.3/CHANGELOG.rst
--- old/celery-batches-0.2/CHANGELOG.rst 2018-04-20 18:59:14.000000000
+0200
+++ new/celery-batches-0.3/CHANGELOG.rst 2020-01-30 00:19:01.000000000
+0100
@@ -3,6 +3,16 @@
Changelog
#########
+0.3 2020-01-29
+==============
+
+* Properly set the ``current_task`` when running Batch tasks.
+* Call the success signal after a successful run of the Batch task.
+* Support running tasks eagerly via the ``Task.apply()`` method. This causes
+ the task to execute with a batch of a single item.
+* Officially support Python 3.7 and 3.8. Drop support for Python 3.4.
+* Officially support Celery 4.3 and 4.4.
+
0.2 2018-04-20
==============
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/README.rst
new/celery-batches-0.3/README.rst
--- old/celery-batches-0.2/README.rst 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/README.rst 2020-01-30 00:19:01.000000000 +0100
@@ -8,7 +8,10 @@
History
=======
-Celery Batches was part of Celery (as ``celery.contrib.batches``) until Celery
-4.0. This is repository includes that history. The Batches code has been
updated
-to maintain compatible with newer versions of Celery and other fixes. See the
-Changelog for details.
+Celery Batches was distributed as part of Celery (as
``celery.contrib.batches``)
+until Celery 4.0. This project updates the Batches code to maintain
compatiblity
+with newer versions of Celery and other fixes. See the Changelog for details.
+
+Additionally, this repository includes the full history of the code from
+``celery.contrib.batches``, but rewritten to the ``celery_batches/__init__.py``
+file.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/celery_batches/__init__.py
new/celery-batches-0.3/celery_batches/__init__.py
--- old/celery-batches-0.2/celery_batches/__init__.py 2018-04-20
18:59:14.000000000 +0200
+++ new/celery-batches-0.3/celery_batches/__init__.py 2020-01-30
00:19:01.000000000 +0100
@@ -3,13 +3,24 @@
celery_batches
==============
-Experimental task class that buffers messages and processes them as a list.
+Experimental task class that buffers messages and processes them as a list.
Task
+requests are buffered in memory (on a worker) until either the flush count or
+flush interval is reached. Once the requests are flushed, they are sent to the
+task as a list of :class:`~celery_batches.SimpleRequest` instances.
+
+It is possible to return a result for each task request by calling
+``mark_as_done`` on your results backend. Returning a value from the Batch task
+call is only used to provide values to signals and does not populate into the
+results backend.
.. warning::
For this to work you have to set
:setting:`worker_prefetch_multiplier` to zero, or some value where
- the final multiplied value is higher than ``flush_every``.
+ the final multiplied value is higher than ``flush_every``. Note that Celery
+ will attempt to continually pull data into memory if this is set to zero.
+ This can cause excessive resource consumption on both Celery workers and
the
+ broker when used with a deep queue.
In the future we hope to add the ability to direct batching tasks
to a channel with different QoS requirements than the task channel.
@@ -45,7 +56,7 @@
import requests
from urlparse import urlparse
- from celery.contrib.batches import Batches
+ from celery_batches import Batches
wot_api_target = 'https://api.mywot.com/0.4/public_link_json'
@@ -86,8 +97,9 @@
from itertools import count
from celery import signals, states
+from celery._state import _task_stack
+from celery.app.task import Context, Task
from celery.five import Empty, Queue
-from celery.task import Task
from celery.utils import noop
from celery.utils.log import get_logger
from celery.worker.request import Request
@@ -127,36 +139,57 @@
send_prerun = signals.task_prerun.send
send_postrun = signals.task_postrun.send
+send_success = signals.task_success.send
SUCCESS = states.SUCCESS
FAILURE = states.FAILURE
def apply_batches_task(task, args, loglevel, logfile):
# Mimics some of the functionality found in celery.app.trace.trace_task.
+ request_stack = task.request_stack
+ push_request = request_stack.push
+ pop_request = request_stack.pop
+ push_task = _task_stack.push
+ pop_task = _task_stack.pop
+
prerun_receivers = signals.task_prerun.receivers
postrun_receivers = signals.task_postrun.receivers
+ success_receivers = signals.task_success.receivers
# Corresponds to multiple requests, so generate a new UUID.
task_id = uuid()
- if prerun_receivers:
- send_prerun(sender=task, task_id=task_id, task=task,
- args=args, kwargs={})
+ push_task(task)
+ task_request = Context(loglevel=loglevel, logfile=logfile)
+ push_request(task_request)
- task.push_request(loglevel=loglevel, logfile=logfile)
try:
- result = task(*args)
- state = SUCCESS
- except Exception as exc:
- result = None
- state = FAILURE
- logger.error('Error: %r', exc, exc_info=True)
+ # -*- PRE -*-
+ if prerun_receivers:
+ send_prerun(sender=task, task_id=task_id, task=task,
+ args=args, kwargs={})
+
+ # -*- TRACE -*-
+ try:
+ result = task(*args)
+ state = SUCCESS
+ except Exception as exc:
+ result = None
+ state = FAILURE
+ logger.error('Error: %r', exc, exc_info=True)
+ else:
+ if success_receivers:
+ send_success(sender=task, result=result)
finally:
- task.pop_request()
- if postrun_receivers:
- send_postrun(sender=task, task_id=task_id, task=task,
- args=args, kwargs={},
- retval=result, state=state)
+ try:
+ if postrun_receivers:
+ send_postrun(sender=task, task_id=task_id, task=task,
+ args=args, kwargs={},
+ retval=result, state=state)
+ finally:
+ pop_task()
+ pop_request()
+
return result
@@ -255,6 +288,26 @@
return task_message_handler
+ def apply(self, args=None, kwargs=None, *_args, **_kwargs):
+ """
+ Execute this task locally as a batch of size 1, by blocking until the
task returns.
+
+ Arguments:
+ args (Tuple): positional arguments passed on to the task.
+ Returns:
+ celery.result.EagerResult: pre-evaluated result.
+ """
+ request = SimpleRequest(
+ id=_kwargs.get("task_id", uuid()),
+ name="batch request",
+ args=args or (),
+ kwargs=kwargs or {},
+ delivery_info=None,
+ hostname="localhost",
+ )
+
+ return super(Batches, self).apply(([request],), {}, *_args, **_kwargs)
+
def flush(self, requests):
return self.apply_buffer(requests, ([SimpleRequest.from_request(r)
for r in requests],))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/requirements/pkgutils.txt
new/celery-batches-0.3/requirements/pkgutils.txt
--- old/celery-batches-0.2/requirements/pkgutils.txt 1970-01-01
01:00:00.000000000 +0100
+++ new/celery-batches-0.3/requirements/pkgutils.txt 2020-01-30
00:19:01.000000000 +0100
@@ -0,0 +1 @@
+flake8>3.5.0
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/requirements/test.txt
new/celery-batches-0.3/requirements/test.txt
--- old/celery-batches-0.2/requirements/test.txt 2018-04-20
18:59:14.000000000 +0200
+++ new/celery-batches-0.3/requirements/test.txt 2020-01-30
00:19:01.000000000 +0100
@@ -1,2 +1,2 @@
-pytest>=3.0,<3.3
+pytest>=3.8.0,<3.9
coverage
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/setup.cfg
new/celery-batches-0.3/setup.cfg
--- old/celery-batches-0.2/setup.cfg 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/setup.cfg 2020-01-30 00:19:01.000000000 +0100
@@ -1,3 +1,7 @@
+[flake8]
+# Don't use a strict line limit if it makes the code more readable.
+ignore = E501
+
[bdist_wheel]
universal = 1
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/setup.py
new/celery-batches-0.3/setup.py
--- old/celery-batches-0.2/setup.py 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/setup.py 2020-01-30 00:19:01.000000000 +0100
@@ -14,7 +14,7 @@
setuptools.setup(
name='celery-batches',
packages=setuptools.find_packages(),
- version='0.2',
+ version='0.3',
description='Experimental task class that buffers messages and processes
them as a list.',
long_description=long_description(),
keywords='task job queue distributed messaging actor',
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/t/integration/conftest.py
new/celery-batches-0.3/t/integration/conftest.py
--- old/celery-batches-0.2/t/integration/conftest.py 2018-04-20
18:59:14.000000000 +0200
+++ new/celery-batches-0.3/t/integration/conftest.py 2020-01-30
00:19:01.000000000 +0100
@@ -1,5 +1,6 @@
import pytest
+
@pytest.fixture(scope='session', params=[1, 2])
def celery_config(request):
return {
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/t/integration/tasks.py
new/celery-batches-0.3/t/integration/tasks.py
--- old/celery-batches-0.2/t/integration/tasks.py 2018-04-20
18:59:14.000000000 +0200
+++ new/celery-batches-0.3/t/integration/tasks.py 2020-01-30
00:19:01.000000000 +0100
@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
-from celery import chain, group, shared_task
-from celery.exceptions import SoftTimeLimitExceeded
+from celery import shared_task
from celery.utils.log import get_task_logger
from celery_batches import Batches
@@ -12,6 +11,7 @@
class Singleton(type):
_instances = {}
+
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args,
**kwargs)
@@ -38,6 +38,7 @@
result += request.args[0]
Results().set(result)
+ return result
@shared_task(base=Batches, flush_every=2, flush_interval=1)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/t/integration/test_batches.py
new/celery-batches-0.3/t/integration/test_batches.py
--- old/celery-batches-0.2/t/integration/test_batches.py 2018-04-20
18:59:14.000000000 +0200
+++ new/celery-batches-0.3/t/integration/test_batches.py 2020-01-30
00:19:01.000000000 +0100
@@ -5,27 +5,34 @@
from celery import signals
from celery.app.task import Task
-from celery.result import _set_task_join_will_block, allow_join_result
+from celery.result import allow_join_result
from celery.contrib.testing.tasks import ping
from .tasks import add, cumadd, Results
class SignalCounter(object):
- def __init__(self, expected_calls):
+ def __init__(self, expected_calls, callback=None):
self.calls = 0
self.expected_calls = expected_calls
+ self.callback = callback
def __call__(self, sender, **kwargs):
if isinstance(sender, Task):
- sender = sender.name
+ sender_name = sender.name
+ else:
+ sender_name = sender
# Ignore pings, those are used to ensure the worker processes tasks.
- if sender == 'celery.ping':
+ if sender_name == 'celery.ping':
return
self.calls += 1
+ # Call the "real" signal, if necessary.
+ if self.callback:
+ self.callback(sender, **kwargs)
+
def assert_calls(self):
assert self.calls == self.expected_calls
@@ -40,8 +47,32 @@
assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
+def test_always_eager():
+ """The batch task runs immediately, in the same thread."""
+ app = add._get_app()
+ task_always_eager = app.conf.task_always_eager
+ app.conf["task_always_eager"] = True
+
+ result = add.delay(1)
+
+ app.conf["task_always_eager"] = task_always_eager
+
+ # An EagerResult that resolve to 1 should be returned.
+ assert result.get() == 1
+ assert Results().get() == 1
+
+
+def test_apply():
+ """The batch task runs immediately, in the same thread."""
+ result = add.apply(args=(1, ))
+
+ # An EagerResult that resolve to 1 should be returned.
+ assert result.get() == 1
+ assert Results().get() == 1
+
+
def test_flush_interval(celery_worker):
- """The batch runs after the flush interval has elapsed."""
+ """The batch task runs after the flush interval has elapsed."""
add.delay(1)
# The flush interval is 1 second, this is longer.
@@ -54,7 +85,7 @@
def test_flush_calls(celery_worker):
- """The batch runs after two calls."""
+ """The batch task runs after two calls."""
add.delay(1)
add.delay(3)
@@ -65,6 +96,7 @@
def test_result(celery_worker):
+ """Each task call can return a result."""
result_1 = cumadd.delay(1)
result_2 = cumadd.delay(2)
@@ -76,7 +108,7 @@
def test_signals(celery_app, celery_worker):
- """The batch runs after two calls."""
+ """Ensure that Celery signals run for the batch task."""
# Configure a SignalCounter for each task signal.
checks = (
# Each task request gets published separately.
@@ -87,7 +119,7 @@
(signals.task_postrun, 1),
# Other task signals are not implemented.
(signals.task_retry, 0),
- (signals.task_success, 0),
+ (signals.task_success, 1),
(signals.task_failure, 0),
(signals.task_revoked, 0),
(signals.task_unknown, 0),
@@ -99,6 +131,7 @@
sig.connect(counter)
signal_counters.append(counter)
+ # The batch runs after 2 task calls.
add.delay(1)
add.delay(3)
@@ -112,5 +145,24 @@
counter.assert_calls()
+def test_current_task(celery_app, celery_worker):
+ """Ensure the current_task is properly set when running the task."""
+ def signal(sender, **kwargs):
+ assert celery_app.current_task.name == 't.integration.tasks.add'
+
+ counter = SignalCounter(1, signal)
+ signals.task_prerun.connect(counter)
+
+ # The batch runs after 2 task calls.
+ add.delay(1)
+ add.delay(3)
+
+ # Let the worker work.
+ _wait_for_ping()
+
+ # Should still have the correct result.
+ assert Results().get() == 4
+ counter.assert_calls()
+
# TODO
# * Test acking
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/celery-batches-0.2/tox.ini
new/celery-batches-0.3/tox.ini
--- old/celery-batches-0.2/tox.ini 2018-04-20 18:59:14.000000000 +0200
+++ new/celery-batches-0.3/tox.ini 2020-01-30 00:19:01.000000000 +0100
@@ -1,13 +1,24 @@
[tox]
envlist =
- {2.7,pypy,3.4,3.5,3.6,pypy3}-celery{40,41,master}
+ {2.7,pypy,3.4,3.5,3.6,pypy3}-celery{40,41,42}
+ # Celery 4.3 adds support for Python 3.7 and higher.
+ {3.7,3.8}-celery{43,44,master}
+ flake8
[testenv]
deps=
-r{toxinidir}/requirements/test.txt
celery40: celery>=4.0,<4.1
+ # Kombu 4.2.0 is incompatible with Celery 4.0.x. It is compatible with
+ # Celery 4.1.1 and 4.2.x.
+ celery40: kombu<4.2
celery41: celery>=4.1,<4.2
+ celery42: celery>=4.2,<4.3
+ celery43: celery>=4.3,<4.4
+ celery44: celery>=4.4,<4.5
celerymaster: https://codeload.github.com/celery/celery/zip/master
+
+ flake8: -r{toxinidir}/requirements/pkgutils.txt
sitepackages = False
recreate = False
commands =
@@ -15,9 +26,15 @@
coverage html
basepython =
2.7: python2.7
- 3.4: python3.4
3.5: python3.5
3.6: python3.6
+ 3.7: python3.7
+ 3.8: python3.8
pypy: pypy
pypy3: pypy3
+ flake8: python3.6
usedevelop = True
+
+[testenv:flake8]
+commands =
+ flake8 -j 2 {toxinidir}/celery_batches {toxinidir}/t