jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/361274 )
Change subject: Tests: convert unittest to pytest
......................................................................
Tests: convert unittest to pytest
Bug: T154588
Change-Id: Ia75f32b212e36d613c784765a52f1317ac690810
---
M cumin/tests/unit/backends/test_direct.py
M cumin/tests/unit/backends/test_puppetdb.py
M cumin/tests/unit/test_backends.py
M cumin/tests/unit/test_cli.py
M cumin/tests/unit/test_grammar.py
M cumin/tests/unit/test_query.py
M cumin/tests/unit/test_transport.py
M cumin/tests/unit/transports/test_clustershell.py
M cumin/tests/unit/transports/test_init.py
9 files changed, 563 insertions(+), 570 deletions(-)
Approvals:
Giuseppe Lavagetto: Looks good to me, but someone else must approve
jenkins-bot: Verified
Volans: Looks good to me, approved
diff --git a/cumin/tests/unit/backends/test_direct.py
b/cumin/tests/unit/backends/test_direct.py
index feaea9b..d63440b 100644
--- a/cumin/tests/unit/backends/test_direct.py
+++ b/cumin/tests/unit/backends/test_direct.py
@@ -1,80 +1,77 @@
"""Direct backend tests."""
-import unittest
+import pytest
from ClusterShell.NodeSet import NodeSet
from cumin.backends import BaseQuery, InvalidQueryError, direct
-class TestDirectQueryClass(unittest.TestCase):
- """Direct backend query_class test class."""
-
- def test_query_class(self):
- """An instance of query_class should be an instance of BaseQuery."""
- query = direct.query_class({})
- self.assertIsInstance(query, BaseQuery)
+def test_direct_query_class():
+ """An instance of query_class should be an instance of BaseQuery."""
+ query = direct.query_class({})
+ assert isinstance(query, BaseQuery)
-class TestDirectQuery(unittest.TestCase):
+class TestDirectQuery(object):
"""Direct backend query test class."""
- def setUp(self):
- """Setup an instace of DirectQuery for each test."""
- self.query = direct.DirectQuery({})
+ def setup_method(self, _):
+ """Setup an instance of DirectQuery for each test."""
+ self.query = direct.DirectQuery({}) # pylint:
disable=attribute-defined-outside-init
def test_instantiation(self):
"""An instance of DirectQuery should be an instance of BaseQuery."""
- self.assertIsInstance(self.query, BaseQuery)
- self.assertDictEqual(self.query.config, {})
+ assert isinstance(self.query, BaseQuery)
+ assert self.query.config == {}
def test_add_category_fact(self):
"""Calling add_category() should raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, r"Category tokens are
not supported"):
+ with pytest.raises(InvalidQueryError, match='Category tokens are not
supported'):
self.query.add_category('F', 'key', 'value')
def test_add_hosts(self):
"""Calling add_hosts() should add the hosts to the NodeSet."""
- self.assertListEqual(list(self.query.hosts), [])
+ assert list(self.query.hosts) == []
# No hosts
self.query.add_hosts(NodeSet.fromlist([]))
- self.assertListEqual(list(self.query.hosts), [])
+ assert list(self.query.hosts) == []
# Single host
self.query.add_hosts(NodeSet.fromlist(['host']))
- self.assertListEqual(list(self.query.hosts), ['host'])
+ assert list(self.query.hosts) == ['host']
# Multiple hosts
self.query.add_hosts(NodeSet.fromlist(['host1', 'host2']))
- self.assertListEqual(list(self.query.hosts), ['host', 'host1',
'host2'])
+ assert list(self.query.hosts) == ['host', 'host1', 'host2']
# Negated query
self.query.add_hosts(NodeSet.fromlist(['host1']), neg=True)
- self.assertListEqual(list(self.query.hosts), ['host', 'host2'])
+ assert list(self.query.hosts) == ['host', 'host2']
# Globbing is not supported
- with self.assertRaisesRegexp(InvalidQueryError, r"Hosts globbing is
not supported"):
+ with pytest.raises(InvalidQueryError, match='Hosts globbing is not
supported'):
self.query.add_hosts(NodeSet.fromlist(['host1*']))
def test_open_subgroup(self):
"""Calling open_subgroup() should raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, r"Subgroups are not
supported"):
+ with pytest.raises(InvalidQueryError, matach='Subgroups are not
supported'):
self.query.open_subgroup()
def test_close_subgroup(self):
"""Calling close_subgroup() should raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, r"Subgroups are not
supported"):
+ with pytest.raises(InvalidQueryError, match='Subgroups are not
supported'):
self.query.close_subgroup()
def test_add_and(self):
"""Calling add_and() should raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, r"Boolean AND operator
is not supported"):
+ with pytest.raises(InvalidQueryError, match='Boolean AND operator is
not supported'):
self.query.add_and()
def test_add_or(self):
"""Calling add_or() should be a noop."""
- self.assertListEqual(list(self.query.hosts), [])
+ assert list(self.query.hosts) == []
self.query.add_or()
- self.assertListEqual(list(self.query.hosts), [])
+ assert list(self.query.hosts) == []
def test_execute(self):
"""Calling execute() should return the list of hosts."""
- self.assertListEqual(list(self.query.hosts), self.query.execute())
+ assert list(self.query.hosts) == self.query.execute()
self.query.add_hosts(NodeSet.fromlist(['host1', 'host2']))
- self.assertListEqual(list(self.query.hosts), self.query.execute())
+ assert list(self.query.hosts) == self.query.execute()
diff --git a/cumin/tests/unit/backends/test_puppetdb.py
b/cumin/tests/unit/backends/test_puppetdb.py
index b3ee47b..081bf56 100644
--- a/cumin/tests/unit/backends/test_puppetdb.py
+++ b/cumin/tests/unit/backends/test_puppetdb.py
@@ -1,7 +1,6 @@
"""PuppetDB backend tests."""
# pylint: disable=invalid-name
-import unittest
-
+import pytest
import requests_mock
from requests.exceptions import HTTPError
@@ -9,162 +8,157 @@
from cumin.backends import BaseQuery, InvalidQueryError, puppetdb
-class TestPuppetDBQueryClass(unittest.TestCase):
- """PuppetDB backend query_class test class."""
-
- def test_query_class(self):
- """An instance of query_class should be an instance of BaseQuery."""
- query = puppetdb.query_class({})
- self.assertIsInstance(query, BaseQuery)
+def test_puppetdb_query_class():
+ """An instance of query_class should be an instance of BaseQuery."""
+ query = puppetdb.query_class({})
+ assert isinstance(query, BaseQuery)
-class TestPuppetDBQuery(unittest.TestCase):
+class TestPuppetDBQuery(object):
"""PuppetDB backend query test class."""
- def setUp(self):
+ def setup_method(self, _):
"""Setup an instace of PuppetDBQuery for each test."""
- self.query = puppetdb.PuppetDBQuery({})
+ self.query = puppetdb.PuppetDBQuery({}) # pylint:
disable=attribute-defined-outside-init
def test_instantiation(self):
"""An instance of PuppetDBQuery should be an instance of BaseQuery."""
- self.assertIsInstance(self.query, BaseQuery)
- self.assertEqual(self.query.url, 'https://localhost:443/v3/')
+ assert isinstance(self.query, BaseQuery)
+ assert self.query.url == 'https://localhost:443/v3/'
def test_category_getter(self):
"""Access to category property should return facts by default."""
- self.assertEqual(self.query.category, 'F')
+ assert self.query.category == 'F'
def test_category_setter(self):
"""Setting category property should accept only valid values, raise
InvalidQueryError otherwise."""
self.query.category = 'F'
- self.assertEqual(self.query.category, 'F')
+ assert self.query.category == 'F'
- with self.assertRaisesRegexp(InvalidQueryError, r"Invalid value
'invalid_value'"):
+ with pytest.raises(InvalidQueryError, match="Invalid value
'invalid_value'"):
self.query.category = 'invalid_value'
- with self.assertRaisesRegexp(InvalidQueryError, r"Mixed F: and R:
queries are currently not supported"):
+ with pytest.raises(InvalidQueryError, match='Mixed F: and R: queries
are currently not supported'):
self.query.category = 'R'
# Get a new query object to test also setting a resource before a fact
query = puppetdb.query_class({})
- self.assertEqual(query.category, 'F')
+ assert query.category == 'F'
query.category = 'R'
- self.assertEqual(query.category, 'R')
+ assert query.category == 'R'
- with self.assertRaisesRegexp(InvalidQueryError, r"Mixed F: and R:
queries are currently not supported"):
+ with pytest.raises(InvalidQueryError, match='Mixed F: and R: queries
are currently not supported'):
query.category = 'F'
def test_add_category_fact(self):
"""Calling add_category() with a fact should add the proper query
token to the object."""
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
# Base fact query
self.query.add_category('F', 'key', 'value')
- self.assertListEqual(self.query.current_group['tokens'], ['["=",
["fact", "key"], "value"]'])
+ assert self.query.current_group['tokens'] == ['["=", ["fact", "key"],
"value"]']
self.query.current_group['tokens'] = []
# Negated query
self.query.add_category('F', 'key', 'value', neg=True)
- self.assertListEqual(self.query.current_group['tokens'], ['["not",
["=", ["fact", "key"], "value"]]'])
+ assert self.query.current_group['tokens'] == ['["not", ["=", ["fact",
"key"], "value"]]']
self.query.current_group['tokens'] = []
# Different operator
self.query.add_category('F', 'key', 'value', operator='>=')
- self.assertListEqual(self.query.current_group['tokens'], ['[">=",
["fact", "key"], "value"]'])
+ assert self.query.current_group['tokens'] == ['[">=", ["fact", "key"],
"value"]']
self.query.current_group['tokens'] = []
# Regex operator
self.query.add_category('F', 'key', r'value\\escaped', operator='~')
- self.assertListEqual(self.query.current_group['tokens'], [r'["~",
["fact", "key"], "value\\\\escaped"]'])
+ assert self.query.current_group['tokens'] == [r'["~", ["fact", "key"],
"value\\\\escaped"]']
# != is not supported by PuppetDB
- with self.assertRaisesRegexp(InvalidQueryError, r"PuppetDB backend
doesn't support"):
+ with pytest.raises(InvalidQueryError, match="PuppetDB backend doesn't
support"):
self.query.add_category('F', 'key', 'value', operator='!=')
def test_add_category_resource_base(self):
"""Calling add_category() with a base resource query should add the
proper query token to the object."""
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
self.query.add_category('R', 'key', 'value')
- self.assertListEqual(self.query.current_group['tokens'],
- ['["and", ["=", "type", "Key"], ["=", "title",
"value"]]'])
+ assert self.query.current_group['tokens'] == ['["and", ["=", "type",
"Key"], ["=", "title", "value"]]']
def test_add_category_resource_class(self):
"""Calling add_category() with a class resource query should add the
proper query token to the object."""
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
self.query.add_category('R', 'class', 'classtitle')
- self.assertListEqual(self.query.current_group['tokens'],
- ['["and", ["=", "type", "Class"], ["=", "title",
"Classtitle"]]'])
+ assert self.query.current_group['tokens'] == ['["and", ["=", "type",
"Class"], ["=", "title", "Classtitle"]]']
def test_add_category_resource_class_path(self):
"""Calling add_category() with a class resource query should add the
proper query token to the object."""
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
self.query.add_category('R', 'class', 'resource::path::to::class')
- self.assertListEqual(self.query.current_group['tokens'],
- ['["and", ["=", "type", "Class"], ["=", "title",
"Resource::Path::To::Class"]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["and", ["=", "type", "Class"], ["=", "title",
"Resource::Path::To::Class"]]']
def test_add_category_resource_neg(self):
"""Calling add_category() with a negated resource query should add the
proper query token to the object."""
self.query.add_category('R', 'key', 'value', neg=True)
- self.assertListEqual(self.query.current_group['tokens'],
- ['["not", ["and", ["=", "type", "Key"], ["=",
"title", "value"]]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["not", ["and", ["=", "type", "Key"], ["=", "title", "value"]]]']
def test_add_category_resource_regex(self):
"""Calling add_category() with a regex resource query should add the
proper query token to the object."""
self.query.add_category('R', 'key', r'value\\escaped', operator='~')
- self.assertListEqual(self.query.current_group['tokens'],
- [r'["and", ["=", "type", "Key"], ["~", "title",
"value\\\\escaped"]]'])
+ assert self.query.current_group['tokens'] == \
+ [r'["and", ["=", "type", "Key"], ["~", "title",
"value\\\\escaped"]]']
def test_add_category_resource_class_regex(self):
"""Calling add_category() with a regex Class resource query should add
the proper query token to the object."""
self.query.add_category('R', 'Class', r'Role::(One|Another)',
operator='~')
- self.assertListEqual(self.query.current_group['tokens'],
- [r'["and", ["=", "type", "Class"], ["~", "title",
"Role::(One|Another)"]]'])
+ assert self.query.current_group['tokens'] == \
+ [r'["and", ["=", "type", "Class"], ["~", "title",
"Role::(One|Another)"]]']
def test_add_category_resource_parameter(self):
"""Calling add_category() with a resource's parameter query should add
the proper query token to the object."""
self.query.add_category('R', 'resource%param', 'value')
- self.assertListEqual(self.query.current_group['tokens'],
- ['["and", ["=", "type", "Resource"], ["=",
["parameter", "param"], "value"]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["and", ["=", "type", "Resource"], ["=", ["parameter", "param"],
"value"]]']
def test_add_category_resource_parameter_regex(self):
"""Calling add_category() with a resource's parameter query with a
regex should raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, 'Regex operations are
not supported in PuppetDB'):
+ with pytest.raises(InvalidQueryError, match='Regex operations are not
supported in PuppetDB'):
self.query.add_category('R', 'resource%param', 'value.*',
operator='~')
def test_add_category_resource_field(self):
"""Calling add_category() with a resource's field query should add the
proper query token to the object."""
self.query.add_category('R', 'resource@field', 'value')
- self.assertListEqual(self.query.current_group['tokens'],
- ['["and", ["=", "type", "Resource"], ["=",
"field", "value"]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["and", ["=", "type", "Resource"], ["=", "field", "value"]]']
def test_add_category_resource(self):
"""Calling add_category() with a resource type should add the proper
query token to the object."""
self.query.add_category('R', 'Resource')
- self.assertListEqual(self.query.current_group['tokens'], ['["and",
["=", "type", "Resource"]]'])
+ assert self.query.current_group['tokens'] == ['["and", ["=", "type",
"Resource"]]']
def test_add_category_resource_parameter_field(self):
"""Calling add_category() with both a parameter and a field should
raise InvalidQueryError."""
- with self.assertRaisesRegexp(InvalidQueryError, 'Resource key cannot
contain both'):
+ with pytest.raises(InvalidQueryError, match='Resource key cannot
contain both'):
self.query.add_category('R', 'resource@field%param')
def test_add_hosts(self):
"""Calling add_hosts() with a resource should add the proper query
token to the object."""
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
# No hosts
self.query.add_hosts([])
- self.assertListEqual(self.query.current_group['tokens'], [])
+ assert self.query.current_group['tokens'] == []
# Single host
self.query.add_hosts(['host'])
- self.assertListEqual(self.query.current_group['tokens'], ['["or",
["=", "{host_key}", "host"]]'])
+ assert self.query.current_group['tokens'] == ['["or", ["=",
"{host_key}", "host"]]']
self.query.current_group['tokens'] = []
# Multiple hosts
self.query.add_hosts(['host1', 'host2'])
- self.assertListEqual(self.query.current_group['tokens'],
- ['["or", ["=", "{host_key}", "host1"], ["=",
"{host_key}", "host2"]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["or", ["=", "{host_key}", "host1"], ["=", "{host_key}",
"host2"]]']
self.query.current_group['tokens'] = []
# Negated query
self.query.add_hosts(['host1', 'host2'], neg=True)
- self.assertListEqual(self.query.current_group['tokens'],
- ['["not", ["or", ["=", "{host_key}", "host1"],
["=", "{host_key}", "host2"]]]'])
+ assert self.query.current_group['tokens'] == \
+ ['["not", ["or", ["=", "{host_key}", "host1"], ["=", "{host_key}",
"host2"]]]']
self.query.current_group['tokens'] = []
# Globbing hosts
self.query.add_hosts(['host1*.domain'])
- self.assertListEqual(self.query.current_group['tokens'], [r'["or",
["~", "{host_key}", "^host1.*\\.domain$"]]'])
+ assert self.query.current_group['tokens'] == [r'["or", ["~",
"{host_key}", "^host1.*\\.domain$"]]']
def test_open_subgroup(self):
"""Calling open_subgroup() should open a subgroup and relate it to
it's parent."""
@@ -173,28 +167,28 @@
parent['tokens'].append(child)
self.query.open_subgroup()
self.query.add_hosts(['host'])
- self.assertListEqual(self.query.current_group['tokens'],
child['tokens'])
- self.assertIsNotNone(self.query.current_group['parent'])
+ assert self.query.current_group['tokens'] == child['tokens']
+ assert self.query.current_group['parent'] is not None
def test_close_subgroup(self):
"""Calling close_subgroup() should close a subgroup and return to the
parent's context."""
self.query.open_subgroup()
self.query.close_subgroup()
- self.assertEqual(len(self.query.current_group['tokens']), 1)
- self.assertListEqual(self.query.current_group['tokens'][0]['tokens'],
[])
- self.assertIsNone(self.query.current_group['parent'])
+ assert len(self.query.current_group['tokens']) == 1
+ assert self.query.current_group['tokens'][0]['tokens'] == []
+ assert self.query.current_group['parent'] is None
def test_add_and(self):
"""Calling add_and() should set the boolean property to the current
group to 'and'."""
- self.assertIsNone(self.query.current_group['bool'])
+ assert self.query.current_group['bool'] is None
self.query.add_and()
- self.assertEqual(self.query.current_group['bool'], 'and')
+ assert self.query.current_group['bool'] == 'and'
def test_add_or(self):
"""Calling add_or() should set the boolean property to the current
group to 'or'."""
- self.assertIsNone(self.query.current_group['bool'])
+ assert self.query.current_group['bool'] is None
self.query.add_or()
- self.assertEqual(self.query.current_group['bool'], 'or')
+ assert self.query.current_group['bool'] == 'or'
def test_add_and_or(self):
"""Calling add_or() and add_and() in the same group should raise
InvalidQueryError."""
@@ -202,16 +196,17 @@
self.query.add_or()
self.query.add_hosts(['host2'])
- with self.assertRaises(InvalidQueryError):
+ with pytest.raises(InvalidQueryError):
self.query.add_and()
@requests_mock.Mocker()
-class TestPuppetDBQueryExecute(unittest.TestCase):
+class TestPuppetDBQueryExecute(object):
"""PuppetDBQuery test execute() method class."""
- def setUp(self):
+ def setup_method(self, _):
"""Setup an instace of PuppetDBQuery for each test."""
+ # pylint: disable=attribute-defined-outside-init
self.query = puppetdb.PuppetDBQuery({'puppetdb':
{'urllib3_disable_warnings': ['SubjectAltNameWarning']}})
def _register_uris(self, requests):
@@ -235,16 +230,16 @@
self._register_uris(requests)
self.query.add_hosts(['nodes_host1', 'nodes_host2'])
hosts = self.query.execute()
- self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2'])
- self.assertEqual(requests.call_count, 1)
+ assert sorted(hosts) == ['nodes_host1', 'nodes_host2']
+ assert requests.call_count == 1
def test_resources_endpoint(self, requests):
"""Calling execute() with a query that goes to the resources endpoint
should return the list of hosts."""
self._register_uris(requests)
self.query.add_category('R', 'Class', 'value')
hosts = self.query.execute()
- self.assertListEqual(sorted(hosts), ['resources_host1',
'resources_host2'])
- self.assertEqual(requests.call_count, 1)
+ assert sorted(hosts) == ['resources_host1', 'resources_host2']
+ assert requests.call_count == 1
def test_with_boolean_operator(self, requests):
"""Calling execute() with a query with a boolean operator should
return the list of hosts."""
@@ -253,8 +248,8 @@
self.query.add_or()
self.query.add_hosts(['nodes_host2'])
hosts = self.query.execute()
- self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2'])
- self.assertEqual(requests.call_count, 1)
+ assert sorted(hosts) == ['nodes_host1', 'nodes_host2']
+ assert requests.call_count == 1
def test_with_subgroup(self, requests):
"""Calling execute() with a query with a subgroup return the list of
hosts."""
@@ -265,23 +260,23 @@
self.query.add_hosts(['nodes_host2'])
self.query.close_subgroup()
hosts = self.query.execute()
- self.assertListEqual(sorted(hosts), ['nodes_host1', 'nodes_host2'])
- self.assertEqual(requests.call_count, 1)
+ assert sorted(hosts) == ['nodes_host1', 'nodes_host2']
+ assert requests.call_count == 1
def test_empty(self, requests):
"""Calling execute() with a query that return no hosts should return
an empty list."""
self._register_uris(requests)
hosts = self.query.execute()
- self.assertListEqual(hosts, [])
- self.assertEqual(requests.call_count, 1)
+ assert hosts == []
+ assert requests.call_count == 1
def test_error(self, requests):
"""Calling execute() if the request fails it should raise the requests
exception."""
self._register_uris(requests)
self.query.current_group['tokens'].append('invalid_query')
- with self.assertRaises(HTTPError):
+ with pytest.raises(HTTPError):
self.query.execute()
- self.assertEqual(requests.call_count, 1)
+ assert requests.call_count == 1
def test_complex_query(self, requests):
"""Calling execute() with a complex query should return the exptected
structure."""
@@ -299,5 +294,5 @@
self.query.add_and()
self.query.add_category('R', 'Class', value='MyClass', operator='=')
hosts = self.query.execute()
- self.assertListEqual(sorted(hosts), ['resources_host1',
'resources_host2'])
- self.assertEqual(requests.call_count, 1)
+ assert sorted(hosts) == ['resources_host1', 'resources_host2']
+ assert requests.call_count == 1
diff --git a/cumin/tests/unit/test_backends.py
b/cumin/tests/unit/test_backends.py
index ad7988d..2027c5e 100644
--- a/cumin/tests/unit/test_backends.py
+++ b/cumin/tests/unit/test_backends.py
@@ -1,14 +1,11 @@
"""Abstract query tests."""
-import unittest
+import pytest
from cumin.backends import BaseQuery
-class TestBaseQuery(unittest.TestCase):
- """Class BaseQuery tests."""
-
- def test_instantiation(self):
- """Class BaseQuery is not instantiable being an abstract class."""
- with self.assertRaises(TypeError):
- BaseQuery({}) # pylint: disable=abstract-class-instantiated
+def test_base_query_instantiation():
+ """Class BaseQuery is not instantiable being an abstract class."""
+ with pytest.raises(TypeError):
+ BaseQuery({}) # pylint: disable=abstract-class-instantiated
diff --git a/cumin/tests/unit/test_cli.py b/cumin/tests/unit/test_cli.py
index 4554c18..cd9ace9 100644
--- a/cumin/tests/unit/test_cli.py
+++ b/cumin/tests/unit/test_cli.py
@@ -1,12 +1,11 @@
"""CLI tests."""
-
import os
import tempfile
-import unittest
from logging import DEBUG, INFO
import mock
+import pytest
from cumin import cli, CuminError
@@ -16,201 +15,213 @@
_ARGV = ['-c', 'doc/examples/config.yaml', '-d', '-m', 'sync', 'host',
'command1', 'command2']
-class TestCLI(unittest.TestCase):
- """CLI module tests."""
+def _validate_parsed_args(args, no_commands=False):
+ """Validate that the parsed args have the proper values."""
+ assert args.debug
+ assert args.config == 'doc/examples/config.yaml'
+ assert args.hosts == 'host'
+ if no_commands:
+ assert args.dry_run
+ else:
+ assert args.commands == ['command1', 'command2']
- def _validate_parsed_args(self, args, no_commands=False):
- """Validate that the parsed args have the proper values."""
- self.assertTrue(args.debug)
- self.assertEqual(args.config, 'doc/examples/config.yaml')
- self.assertEqual(args.hosts, 'host')
- if no_commands:
- self.assertTrue(args.dry_run)
- else:
- self.assertEqual(args.commands, ['command1', 'command2'])
- def test_parse_args_ok(self):
- """A standard set of command line parameters should be properly parsed
into their respective variables."""
- args = cli.parse_args(argv=_ARGV)
- self._validate_parsed_args(args)
+def test_parse_args_ok():
+ """A standard set of command line parameters should be properly parsed
into their respective variables."""
+ args = cli.parse_args(argv=_ARGV)
+ _validate_parsed_args(args)
- with mock.patch.object(cli.sys, 'argv', ['progname'] + _ARGV):
- args = cli.parse_args()
- self._validate_parsed_args(args)
+ with mock.patch.object(cli.sys, 'argv', ['progname'] + _ARGV):
+ args = cli.parse_args()
+ _validate_parsed_args(args)
- def test_parse_args_no_commands(self):
- """If no commands are specified, dry-run mode should be implied."""
- args = cli.parse_args(argv=_ARGV[:-2])
- self._validate_parsed_args(args, no_commands=True)
- def test_parse_args_no_mode(self):
- """If mode is not speficied with multiple commands, parsing the args
should raise a parser error."""
- index = _ARGV.index('-m')
- with self.assertRaises(SystemExit):
- cli.parse_args(argv=_ARGV[:index] + _ARGV[index + 1:])
+def test_parse_args_no_commands():
+ """If no commands are specified, dry-run mode should be implied."""
+ args = cli.parse_args(argv=_ARGV[:-2])
+ _validate_parsed_args(args, no_commands=True)
- def test_get_running_user(self):
- """Unsufficient permissions or unknown user should raise RuntimeError
and a proper user should be detected."""
- env = {'USER': None, 'SUDO_USER': None}
- with mock.patch('os.getenv', env.get):
- with self.assertRaisesRegexp(CuminError, r'Insufficient
privileges, run with sudo'):
- cli.get_running_user()
- env = {'USER': 'root', 'SUDO_USER': None}
- with mock.patch('os.getenv', env.get):
- with self.assertRaisesRegexp(CuminError, r'Unable to determine
real user'):
- cli.get_running_user()
+def test_parse_args_no_mode():
+ """If mode is not speficied with multiple commands, parsing the args
should raise a parser error."""
+ index = _ARGV.index('-m')
+ with pytest.raises(SystemExit):
+ cli.parse_args(argv=_ARGV[:index] + _ARGV[index + 1:])
- with mock.patch('os.getenv', _ENV.get):
- self.assertEqual(cli.get_running_user(), 'user')
- @mock.patch('cumin.cli.os')
- @mock.patch('cumin.cli.RotatingFileHandler')
- @mock.patch('cumin.cli.logger')
- def test_setup_logging(self, logging, file_handler, mocked_os):
- """Calling setup_logging() should properly setup the logger."""
- mocked_os.path.exists.return_value = False
- cli.setup_logging('/path/to/filename')
- logging.setLevel.assert_called_with(INFO)
+def test_get_running_user():
+ """Unsufficient permissions or unknown user should raise RuntimeError and
a proper user should be detected."""
+ env = {'USER': None, 'SUDO_USER': None}
+ with mock.patch('os.getenv', env.get):
+ with pytest.raises(CuminError, match='Insufficient privileges, run
with sudo'):
+ cli.get_running_user()
- mocked_os.path.exists.return_value = True
- cli.setup_logging('filename', debug=True)
- logging.setLevel.assert_called_with(DEBUG)
- self.assertTrue(file_handler.called)
+ env = {'USER': 'root', 'SUDO_USER': None}
+ with mock.patch('os.getenv', env.get):
+ with pytest.raises(CuminError, match='Unable to determine real user'):
+ cli.get_running_user()
- def test_parse_config_ok(self):
- """The configuration file is properly parsed and accessible."""
- config = cli.parse_config('doc/examples/config.yaml')
- self.assertTrue('log_file' in config)
+ with mock.patch('os.getenv', _ENV.get):
+ assert cli.get_running_user() == 'user'
- def test_parse_config_non_existent(self):
- """A CuminError is raised if the configuration file is not
available."""
- with self.assertRaisesRegexp(CuminError, 'Unable to read configuration
file'):
- cli.parse_config('not_existent_config.yaml')
- def test_parse_config_invalid(self):
- """A CuminError is raised if the configuration cannot be parsed."""
- invalid_yaml = '\n'.join((
- 'foo:',
- ' bar: baz',
- ' - foobar',
- ))
- tmpfile, tmpfilepath = tempfile.mkstemp(suffix='config.yaml',
prefix='cumin', text=True)
- os.write(tmpfile, invalid_yaml)
[email protected]('cumin.cli.os')
[email protected]('cumin.cli.RotatingFileHandler')
[email protected]('cumin.cli.logger')
+def test_setup_logging(logging, file_handler, mocked_os):
+ """Calling setup_logging() should properly setup the logger."""
+ mocked_os.path.exists.return_value = False
+ cli.setup_logging('/path/to/filename')
+ logging.setLevel.assert_called_with(INFO)
- with self.assertRaisesRegexp(CuminError, 'Unable to parse
configuration file'):
- cli.parse_config(tmpfilepath)
+ mocked_os.path.exists.return_value = True
+ cli.setup_logging('filename', debug=True)
+ logging.setLevel.assert_called_with(DEBUG)
+ assert file_handler.called
- @mock.patch('cumin.cli.stderr')
- @mock.patch('cumin.cli.raw_input')
- @mock.patch('cumin.cli.sys.stdout.isatty')
- @mock.patch('cumin.cli.logger')
- def test_sigint_handler(self, logging, isatty, mocked_raw_input, stderr):
# pylint: disable=unused-argument
- """Calling the SIGINT handler should raise KeyboardInterrupt or not
based on tty and answer."""
- # Signal handler called without a tty
- isatty.return_value = False
- with self.assertRaises(cli.KeyboardInterruptError):
- cli.sigint_handler(1, None)
- # Signal handler called with a tty
- isatty.return_value = True
- with self.assertRaises(cli.KeyboardInterruptError):
- cli.sigint_handler(1, None)
+def test_parse_config_ok():
+ """The configuration file is properly parsed and accessible."""
+ config = cli.parse_config('doc/examples/config.yaml')
+ assert 'log_file' in config
- # # Signal handler called with a tty, answered 'y'
- # isatty.return_value = True
- # mocked_raw_input.return_value = 'y'
- # with self.assertRaises(cli.KeyboardInterruptError):
- # cli.sigint_handler(1, None)
- #
- # # Signal handler called with a tty, answered 'n'
- # isatty.return_value = True
- # mocked_raw_input.return_value = 'n'
- # self.assertIsNone(cli.sigint_handler(1, None))
- #
- # # Signal handler called with a tty, answered 'invalid_answer'
- # isatty.return_value = True
- # mocked_raw_input.return_value = 'invalid_answer'
- # with self.assertRaises(cli.KeyboardInterruptError):
- # cli.sigint_handler(1, None)
- #
- # # Signal handler called with a tty, empty answer
- # isatty.return_value = True
- # mocked_raw_input.return_value = ''
- # with self.assertRaises(cli.KeyboardInterruptError):
- # cli.sigint_handler(1, None)
- @mock.patch('cumin.cli.tqdm')
- def test_stderr(self, tqdm):
- """Calling stderr() should call tqdm.write()."""
- cli.stderr('message')
- self.assertTrue(tqdm.write.called)
+def test_parse_config_non_existent():
+ """A CuminError is raised if the configuration file is not available."""
+ with pytest.raises(CuminError, match='Unable to read configuration file'):
+ cli.parse_config('not_existent_config.yaml')
- @mock.patch('cumin.cli.stderr')
- @mock.patch('cumin.cli.raw_input')
- @mock.patch('cumin.cli.sys.stdout.isatty')
- def test_get_hosts_ok(self, isatty, mocked_raw_input, stderr):
- """Calling get_hosts() should query the backend and return the list of
hosts."""
- args = cli.parse_args(argv=['host1', 'command1'])
- config = {'backend': 'direct'}
- isatty.return_value = True
- mocked_raw_input.return_value = 'y'
- self.assertListEqual(cli.get_hosts(args, config), ['host1'])
+def test_parse_config_invalid():
+ """A CuminError is raised if the configuration cannot be parsed."""
+ invalid_yaml = '\n'.join((
+ 'foo:',
+ ' bar: baz',
+ ' - foobar',
+ ))
+ tmpfile, tmpfilepath = tempfile.mkstemp(suffix='config.yaml',
prefix='cumin', text=True)
+ os.write(tmpfile, invalid_yaml)
- mocked_raw_input.return_value = 'n'
- with self.assertRaises(cli.KeyboardInterruptError):
- cli.get_hosts(args, config)
+ with pytest.raises(CuminError, match='Unable to parse configuration file'):
+ cli.parse_config(tmpfilepath)
- mocked_raw_input.return_value = 'invalid_answer'
- with self.assertRaises(cli.KeyboardInterruptError):
- cli.get_hosts(args, config)
- mocked_raw_input.return_value = ''
- with self.assertRaises(cli.KeyboardInterruptError):
- cli.get_hosts(args, config)
[email protected]('cumin.cli.stderr')
[email protected]('cumin.cli.raw_input')
[email protected]('cumin.cli.sys.stdout.isatty')
[email protected]('cumin.cli.logger')
+def test_sigint_handler(logging, isatty, mocked_raw_input, stderr): # pylint:
disable=unused-argument
+ """Calling the SIGINT handler should raise KeyboardInterrupt or not based
on tty and answer."""
+ # Signal handler called without a tty
+ isatty.return_value = False
+ with pytest.raises(cli.KeyboardInterruptError):
+ cli.sigint_handler(1, None)
- self.assertTrue(stderr.called)
+ # Signal handler called with a tty
+ isatty.return_value = True
+ with pytest.raises(cli.KeyboardInterruptError):
+ cli.sigint_handler(1, None)
- @mock.patch('cumin.cli.stderr')
- @mock.patch('cumin.cli.sys.stdout.isatty')
- def test_get_hosts_no_tty_ko(self, isatty, stderr):
- """Calling get_hosts() without a TTY should raise RuntimeError if
--dry-run or --force are not specified."""
- args = cli.parse_args(argv=['host1', 'command1'])
- config = {'backend': 'direct'}
- isatty.return_value = False
- with self.assertRaisesRegexp(CuminError, 'Not in a TTY but neither
DRY-RUN nor FORCE mode were specified'):
- cli.get_hosts(args, config)
- self.assertTrue(stderr.called)
+ # # Signal handler called with a tty, answered 'y'
+ # isatty.return_value = True
+ # mocked_raw_input.return_value = 'y'
+ # with pytest.raises(cli.KeyboardInterruptError):
+ # cli.sigint_handler(1, None)
+ #
+ # # Signal handler called with a tty, answered 'n'
+ # isatty.return_value = True
+ # mocked_raw_input.return_value = 'n'
+ # assert cli.sigint_handler(1, None) is None
+ #
+ # # Signal handler called with a tty, answered 'invalid_answer'
+ # isatty.return_value = True
+ # mocked_raw_input.return_value = 'invalid_answer'
+ # with pytest.raises(cli.KeyboardInterruptError):
+ # cli.sigint_handler(1, None)
+ #
+ # # Signal handler called with a tty, empty answer
+ # isatty.return_value = True
+ # mocked_raw_input.return_value = ''
+ # with pytest.raises(cli.KeyboardInterruptError):
+ # cli.sigint_handler(1, None)
- @mock.patch('cumin.cli.stderr')
- @mock.patch('cumin.cli.sys.stdout.isatty')
- def test_get_hosts_no_tty_dry_run(self, isatty, stderr):
- """Calling get_hosts() with or without a TTY with --dry-run should
return an empty list."""
- args = cli.parse_args(argv=['--dry-run', 'host1', 'command1'])
- config = {'backend': 'direct'}
- self.assertListEqual(cli.get_hosts(args, config), [])
- isatty.return_value = True
- self.assertListEqual(cli.get_hosts(args, config), [])
- self.assertTrue(stderr.called)
- @mock.patch('cumin.cli.stderr')
- @mock.patch('cumin.cli.sys.stdout.isatty')
- def test_get_hosts_no_tty_force(self, isatty, stderr):
- """Calling get_hosts() with or without a TTY with --force should
return the list of hosts."""
- args = cli.parse_args(argv=['--force', 'host1', 'command1'])
- config = {'backend': 'direct'}
- self.assertListEqual(cli.get_hosts(args, config), ['host1'])
- isatty.return_value = True
- self.assertListEqual(cli.get_hosts(args, config), ['host1'])
- self.assertTrue(stderr.called)
[email protected]('cumin.cli.tqdm')
+def test_stderr(tqdm):
+ """Calling stderr() should call tqdm.write()."""
+ cli.stderr('message')
+ assert tqdm.write.called
- @mock.patch('cumin.cli.Transport')
- @mock.patch('cumin.cli.stderr')
- def test_run(self, stderr, transport):
- """Calling run() should query the hosts and execute the commands on
the transport."""
- args = cli.parse_args(argv=['--force', 'host1', 'command1'])
- config = {'backend': 'direct', 'transport': 'clustershell'}
- cli.run(args, config)
- transport.new.assert_called_once_with(config, cli.logger)
- self.assertTrue(stderr.called)
+
[email protected]('cumin.cli.stderr')
[email protected]('cumin.cli.raw_input')
[email protected]('cumin.cli.sys.stdout.isatty')
+def test_get_hosts_ok(isatty, mocked_raw_input, stderr):
+ """Calling get_hosts() should query the backend and return the list of
hosts."""
+ args = cli.parse_args(argv=['host1', 'command1'])
+ config = {'backend': 'direct'}
+ isatty.return_value = True
+
+ mocked_raw_input.return_value = 'y'
+ assert cli.get_hosts(args, config) == ['host1']
+
+ mocked_raw_input.return_value = 'n'
+ with pytest.raises(cli.KeyboardInterruptError):
+ cli.get_hosts(args, config)
+
+ mocked_raw_input.return_value = 'invalid_answer'
+ with pytest.raises(cli.KeyboardInterruptError):
+ cli.get_hosts(args, config)
+
+ mocked_raw_input.return_value = ''
+ with pytest.raises(cli.KeyboardInterruptError):
+ cli.get_hosts(args, config)
+
+ assert stderr.called
+
+
[email protected]('cumin.cli.stderr')
[email protected]('cumin.cli.sys.stdout.isatty')
+def test_get_hosts_no_tty_ko(isatty, stderr):
+ """Calling get_hosts() without a TTY should raise RuntimeError if
--dry-run or --force are not specified."""
+ args = cli.parse_args(argv=['host1', 'command1'])
+ config = {'backend': 'direct'}
+ isatty.return_value = False
+ with pytest.raises(CuminError, match='Not in a TTY but neither DRY-RUN nor
FORCE mode were specified'):
+ cli.get_hosts(args, config)
+ assert stderr.called
+
+
[email protected]('cumin.cli.stderr')
[email protected]('cumin.cli.sys.stdout.isatty')
+def test_get_hosts_no_tty_dry_run(isatty, stderr):
+ """Calling get_hosts() with or without a TTY with --dry-run should return
an empty list."""
+ args = cli.parse_args(argv=['--dry-run', 'host1', 'command1'])
+ config = {'backend': 'direct'}
+ assert cli.get_hosts(args, config) == []
+ isatty.return_value = True
+ assert cli.get_hosts(args, config) == []
+ assert stderr.called
+
+
[email protected]('cumin.cli.stderr')
[email protected]('cumin.cli.sys.stdout.isatty')
+def test_get_hosts_no_tty_force(isatty, stderr):
+ """Calling get_hosts() with or without a TTY with --force should return
the list of hosts."""
+ args = cli.parse_args(argv=['--force', 'host1', 'command1'])
+ config = {'backend': 'direct'}
+ assert cli.get_hosts(args, config) == ['host1']
+ isatty.return_value = True
+ assert cli.get_hosts(args, config) == ['host1']
+ assert stderr.called
+
+
[email protected]('cumin.cli.Transport')
[email protected]('cumin.cli.stderr')
+def test_run(stderr, transport):
+ """Calling run() should query the hosts and execute the commands on the
transport."""
+ args = cli.parse_args(argv=['--force', 'host1', 'command1'])
+ config = {'backend': 'direct', 'transport': 'clustershell'}
+ cli.run(args, config)
+ transport.new.assert_called_once_with(config, cli.logger)
+ assert stderr.called
diff --git a/cumin/tests/unit/test_grammar.py b/cumin/tests/unit/test_grammar.py
index dfd77e9..2c78319 100644
--- a/cumin/tests/unit/test_grammar.py
+++ b/cumin/tests/unit/test_grammar.py
@@ -1,7 +1,5 @@
"""Grammar tests."""
-import unittest
-
from cumin.grammar import grammar
from cumin.tests import get_fixture
@@ -13,27 +11,27 @@
return token, expected
-class TestGrammar(unittest.TestCase):
- """Grammar class tests."""
+def test_valid_strings():
+ """Run quick pyparsing test over valid grammar strings."""
+ results = grammar.runTests(get_fixture('valid_grammars.txt',
as_string=True))
+ assert results[0]
- def test_valid_strings(self):
- """Run quick pyparsing test over valid grammar strings."""
- results = grammar.runTests(get_fixture('valid_grammars.txt',
as_string=True))
- self.assertTrue(results[0])
- def test_invalid_strings(self):
- """Run quick pyparsing test over invalid grammar strings."""
- results = grammar.runTests(get_fixture('invalid_grammars.txt',
as_string=True), failureTests=True)
- self.assertTrue(results[0])
+def test_invalid_strings():
+ """Run quick pyparsing test over invalid grammar strings."""
+ results = grammar.runTests(get_fixture('invalid_grammars.txt',
as_string=True), failureTests=True)
+ assert results[0]
- def test_single_category_key_token(self):
- """A valid single token with a category that has key is properly
parsed and interpreted."""
- token, expected = _get_category_key_token()
- parsed = grammar.parseString(token, parseAll=True)
- self.assertDictEqual(parsed[0].asDict(), expected)
- def test_hosts_selection(self):
- """A host selection is properly parsed and interpreted."""
- hosts = {'hosts': 'host[10-20,30-40].domain'}
- parsed = grammar.parseString(hosts['hosts'], parseAll=True)
- self.assertDictEqual(parsed[0].asDict(), hosts)
+def test_single_category_key_token():
+ """A valid single token with a category that has key is properly parsed
and interpreted."""
+ token, expected = _get_category_key_token()
+ parsed = grammar.parseString(token, parseAll=True)
+ assert parsed[0].asDict() == expected
+
+
+def test_hosts_selection():
+ """A host selection is properly parsed and interpreted."""
+ hosts = {'hosts': 'host[10-20,30-40].domain'}
+ parsed = grammar.parseString(hosts['hosts'], parseAll=True)
+ assert parsed[0].asDict() == hosts
diff --git a/cumin/tests/unit/test_query.py b/cumin/tests/unit/test_query.py
index 7c2b88d..4dbd2ee 100644
--- a/cumin/tests/unit/test_query.py
+++ b/cumin/tests/unit/test_query.py
@@ -3,9 +3,9 @@
import logging
import os
import pkgutil
-import unittest
import mock
+import pytest
from ClusterShell.NodeSet import NodeSet
from pyparsing import ParseException
@@ -28,12 +28,14 @@
return mock.MagicMock(spec_set=BaseQuery)
-class TestQuery(unittest.TestCase):
+class TestQuery(object):
"""Query factory class tests."""
+
+ # pylint: disable=no-self-use
def test_invalid_backend(self):
"""Passing an invalid backend should raise RuntimeError."""
- with self.assertRaisesRegexp(RuntimeError, r"ImportError\('No module
named non_existent_backend'"):
+ with pytest.raises(RuntimeError, match=r"ImportError\('No module named
non_existent_backend'"):
Query.new({'backend': 'non_existent_backend'})
def test_missing_query_class(self):
@@ -41,17 +43,17 @@
module = mock.MagicMock()
del module.query_class
with mock.patch('importlib.import_module', lambda _: module):
- with self.assertRaisesRegexp(RuntimeError,
r"AttributeError\('query_class'"):
+ with pytest.raises(RuntimeError,
match=r"AttributeError\('query_class'"):
Query.new({'backend': 'invalid_backend'})
def test_valid_backend(self):
"""Passing a valid backend should return an instance of BaseQuery."""
backends = [name for _, name, _ in
pkgutil.iter_modules([os.path.join('cumin', 'backends')])]
for backend in backends:
- self.assertIsInstance(Query.new({'backend': backend}), BaseQuery)
+ assert isinstance(Query.new({'backend': backend}), BaseQuery)
-class TestQueryBuilder(unittest.TestCase):
+class TestQueryBuilder(object):
"""Class QueryBuilder tests."""
query_string = 'host1 or (not F:key1 = value and R:key2 ~ regex) or host2'
@@ -62,10 +64,10 @@
def test_instantiation(self):
"""Class QueryBuilder should create an instance of a query_class for
the given backend."""
query_builder = QueryBuilder(self.query_string, self.config)
- self.assertIsInstance(query_builder, QueryBuilder)
- self.assertIsInstance(query_builder.query, BaseQuery)
- self.assertEqual(query_builder.query_string, self.query_string)
- self.assertEqual(query_builder.level, 0)
+ assert isinstance(query_builder, QueryBuilder)
+ assert isinstance(query_builder.query, BaseQuery)
+ assert query_builder.query_string == self.query_string
+ assert query_builder.level == 0
@mock.patch('cumin.query.Query', QueryFactory)
def test_build_valid(self):
@@ -94,12 +96,12 @@
def test_build_invalid(self):
"""QueryBuilder.build() should raise ParseException for an invalid
query."""
query_builder = QueryBuilder(self.invalid_query_string, self.config)
- with self.assertRaisesRegexp(ParseException, r"Expected end of text"):
+ with pytest.raises(ParseException, match='Expected end of text'):
query_builder.build()
@mock.patch('cumin.query.Query', QueryFactory)
def test__parse_token(self):
"""QueryBuilder._parse_token() should raise RuntimeError for an
invalid token."""
query_builder = QueryBuilder(self.invalid_query_string, self.config)
- with self.assertRaisesRegexp(RuntimeError, r"Invalid query string
syntax"):
+ with pytest.raises(RuntimeError, match='Invalid query string syntax'):
query_builder._parse_token('invalid_token') # pylint:
disable=protected-access
diff --git a/cumin/tests/unit/test_transport.py
b/cumin/tests/unit/test_transport.py
index 1aa7bd4..2b10ed6 100644
--- a/cumin/tests/unit/test_transport.py
+++ b/cumin/tests/unit/test_transport.py
@@ -2,32 +2,31 @@
import os
import pkgutil
-import unittest
+import pytest
import mock
from cumin.transport import Transport
from cumin.transports import BaseWorker
-class TestTransport(unittest.TestCase):
- """Transport factory class tests."""
+def test_invalid_transport():
+ """Passing an invalid transport should raise RuntimeError."""
+ with pytest.raises(RuntimeError, match=r"ImportError\('No module named
non_existent_transport'"):
+ Transport.new({'transport': 'non_existent_transport'})
- def test_invalid_transport(self):
- """Passing an invalid transport should raise RuntimeError."""
- with self.assertRaisesRegexp(RuntimeError, r"ImportError\('No module
named non_existent_transport'"):
- Transport.new({'transport': 'non_existent_transport'})
- def test_missing_worker_class(self):
- """Passing a transport without a defined worker_class should raise
RuntimeError."""
- module = mock.MagicMock()
- del module.worker_class
- with mock.patch('importlib.import_module', lambda _: module):
- with self.assertRaisesRegexp(RuntimeError,
r"AttributeError\('worker_class'"):
- Transport.new({'transport': 'invalid_transport'})
+def test_missing_worker_class():
+ """Passing a transport without a defined worker_class should raise
RuntimeError."""
+ module = mock.MagicMock()
+ del module.worker_class
+ with mock.patch('importlib.import_module', lambda _: module):
+ with pytest.raises(RuntimeError,
match=r"AttributeError\('worker_class'"):
+ Transport.new({'transport': 'invalid_transport'})
- def test_valid_transport(self):
- """Passing a valid transport should return an instance of
BaseWorker."""
- transports = [name for _, name, _ in
pkgutil.iter_modules([os.path.join('cumin', 'transports')])]
- for transport in transports:
- self.assertIsInstance(Transport.new({'transport': transport}),
BaseWorker)
+
+def test_valid_transport():
+ """Passing a valid transport should return an instance of BaseWorker."""
+ transports = [name for _, name, _ in
pkgutil.iter_modules([os.path.join('cumin', 'transports')])]
+ for transport in transports:
+ assert isinstance(Transport.new({'transport': transport}), BaseWorker)
diff --git a/cumin/tests/unit/transports/test_clustershell.py
b/cumin/tests/unit/transports/test_clustershell.py
index 180e8ac..b6f41b5 100644
--- a/cumin/tests/unit/transports/test_clustershell.py
+++ b/cumin/tests/unit/transports/test_clustershell.py
@@ -1,38 +1,32 @@
"""ClusterShell transport tests."""
-# pylint: disable=invalid-name,no-member,protected-access
-import unittest
+# pylint:
disable=invalid-name,no-member,protected-access,attribute-defined-outside-init
import mock
+import pytest
from cumin.transports import BaseWorker, Command, clustershell, State,
WorkerError
-class TestNode(unittest.TestCase):
- """Node class tests."""
-
- def test_instantiation(self):
- """Default values should be set when a Node instance is created."""
- node = clustershell.Node('name', [Command('command1'),
Command('command2')])
- self.assertEqual(node.running_command_index, -1)
- self.assertIsInstance(node.state, State)
+def test_node_class_instantiation():
+ """Default values should be set when a Node instance is created."""
+ node = clustershell.Node('name', [Command('command1'),
Command('command2')])
+ assert node.running_command_index == -1
+ assert isinstance(node.state, State)
-class TestWorkerClass(unittest.TestCase):
- """ClusterShell backend worker_class test class."""
-
- @mock.patch('cumin.transports.clustershell.Task.task_self')
- def test_worker_class(self, task_self):
- """An instance of worker_class should be an instance of BaseWorker."""
- worker = clustershell.worker_class({})
- self.assertIsInstance(worker, BaseWorker)
- task_self.assert_called_once_with()
[email protected]('cumin.transports.clustershell.Task.task_self')
+def test_worker_class(task_self):
+ """An instance of worker_class should be an instance of BaseWorker."""
+ worker = clustershell.worker_class({})
+ assert isinstance(worker, BaseWorker)
+ task_self.assert_called_once_with()
-class TestClusterShellWorker(unittest.TestCase):
+class TestClusterShellWorker(object):
"""ClusterShell backend worker test class."""
@mock.patch('cumin.transports.clustershell.Task.task_self')
- def setUp(self, task_self): # pylint: disable=arguments-differ
+ def setup_method(self, _, task_self): # pylint: disable=arguments-differ
"""Initialize default properties and instances"""
self.config = {
'clustershell': {
@@ -57,7 +51,7 @@
def test_instantiation(self, task_self):
"""An instance of ClusterShellWorker should be an instance of
BaseWorker and initialize ClusterShell."""
worker = clustershell.ClusterShellWorker(self.config)
- self.assertIsInstance(worker, BaseWorker)
+ assert isinstance(worker, BaseWorker)
task_self.assert_called_once_with()
worker.task.set_info.assert_has_calls(
[mock.call('fanout', 3),
@@ -69,7 +63,7 @@
self.worker.execute()
self.worker.task.shell.assert_called_once_with(
'command1', nodes=self.nodes_set,
handler=self.worker._handler_instance, timeout=None)
- self.assertTrue(clustershell.DEFAULT_HANDLERS['sync'].called)
+ assert clustershell.DEFAULT_HANDLERS['sync'].called
def test_execute_default_async_handler(self):
"""Calling execute() in async mode without event handler should use
the default async event handler."""
@@ -77,7 +71,7 @@
self.worker.execute()
self.worker.task.shell.assert_called_once_with(
'command1', nodes=self.nodes_set,
handler=self.worker._handler_instance, timeout=None)
- self.assertTrue(clustershell.DEFAULT_HANDLERS['async'].called)
+ assert clustershell.DEFAULT_HANDLERS['async'].called
def test_execute_timeout(self):
"""Calling execute() and let the global timeout expire should call
on_timeout."""
@@ -90,7 +84,7 @@
"""Calling execute() using a custom handler should call ClusterShell
task with the custom event handler."""
self.worker.handler = ConcreteBaseEventHandler
self.worker.execute()
- self.assertIsInstance(self.worker._handler_instance,
ConcreteBaseEventHandler)
+ assert isinstance(self.worker._handler_instance,
ConcreteBaseEventHandler)
self.worker.task.shell.assert_called_once_with(
'command1', nodes=self.nodes_set,
handler=self.worker._handler_instance, timeout=None)
@@ -98,17 +92,17 @@
"""Calling execute() without commands should return without doing
anything."""
self.worker.commands = []
self.worker.execute()
- self.assertFalse(self.worker.task.shell.called)
+ assert not self.worker.task.shell.called
def test_execute_one_command_no_mode(self):
"""Calling execute() with only one command without mode should raise
exception."""
self.worker.commands = [self.commands[0]]
- with self.assertRaisesRegexp(RuntimeError, 'An EventHandler is
mandatory.'):
+ with pytest.raises(RuntimeError, match=r'An EventHandler is
mandatory\.'):
self.worker.execute()
def test_execute_wrong_mode(self):
"""Calling execute() without setting the mode with multiple commands
should raise RuntimeError."""
- with self.assertRaisesRegexp(RuntimeError, r'An EventHandler is
mandatory.'):
+ with pytest.raises(RuntimeError, match=r'An EventHandler is
mandatory\.'):
self.worker.execute()
def test_execute_batch_size(self):
@@ -130,14 +124,14 @@
output = None
for nodes, output in self.worker.get_results():
pass
- self.assertEqual(str(nodes), 'node[90-92]')
- self.assertEqual(output, 'output 9')
+ assert str(nodes) == 'node[90-92]'
+ assert output == 'output 9'
def test_handler_getter(self):
"""Access to handler property should return the handler class or
None"""
- self.assertIsNone(self.worker.handler)
+ assert self.worker.handler is None
self.worker.handler = 'sync'
- self.assertEqual(self.worker._handler,
clustershell.DEFAULT_HANDLERS['sync'])
+ assert self.worker._handler == clustershell.DEFAULT_HANDLERS['sync']
def test_handler_setter_invalid(self):
"""Raise WorkerError if trying to set it to an invalid class or
value"""
@@ -146,26 +140,26 @@
pass
- with self.assertRaisesRegexp(WorkerError, r'handler must be one of'):
+ with pytest.raises(WorkerError, match='handler must be one of'):
self.worker.handler = 'invalid-handler'
- with self.assertRaisesRegexp(WorkerError, r'handler must be one of'):
+ with pytest.raises(WorkerError, match='handler must be one of'):
self.worker.handler = InvalidClass
def test_handler_setter_default_sync(self):
"""Should set the handler to the default handler for the sync mode"""
self.worker.handler = 'sync'
- self.assertEqual(self.worker._handler,
clustershell.DEFAULT_HANDLERS['sync'])
+ assert self.worker._handler == clustershell.DEFAULT_HANDLERS['sync']
def test_handler_setter_default_async(self):
"""Should set the handler to the default handler for the async mode"""
self.worker.handler = 'async'
- self.assertEqual(self.worker._handler,
clustershell.DEFAULT_HANDLERS['async'])
+ assert self.worker._handler == clustershell.DEFAULT_HANDLERS['async']
def test_handler_setter_custom(self):
"""Should set the handler to the given custom class that inherit from
BaseEventHandler"""
self.worker.handler = ConcreteBaseEventHandler
- self.assertEqual(self.worker._handler, ConcreteBaseEventHandler)
+ assert self.worker._handler == ConcreteBaseEventHandler
@staticmethod
def iter_buffers():
@@ -174,10 +168,10 @@
yield 'output {}'.format(i), ['node{}0'.format(i),
'node{}1'.format(i), 'node{}2'.format(i)]
-class TestBaseEventHandler(unittest.TestCase):
+class TestBaseEventHandler(object):
"""BaseEventHandler test class."""
- def setUp(self, *args): # pylint: disable=arguments-differ
+ def setup_method(self, *args): # pylint: disable=arguments-differ
"""Initialize default properties and instances."""
self.nodes = ['node1', 'node2']
self.commands = [Command('command1', ok_codes=[0, 100]),
Command('command2', timeout=5)]
@@ -192,7 +186,7 @@
def test_close(self, colorama):
"""Calling close should raise NotImplementedError."""
self.handler = clustershell.BaseEventHandler(self.nodes, self.commands)
- with self.assertRaises(NotImplementedError):
+ with pytest.raises(NotImplementedError):
self.handler.close(self.worker)
colorama.init.assert_called_once_with()
@@ -215,9 +209,9 @@
@mock.patch('cumin.transports.clustershell.colorama')
@mock.patch('cumin.transports.clustershell.tqdm')
- def setUp(self, tqdm, colorama): # pylint: disable=arguments-differ
+ def setup_method(self, _, tqdm, colorama): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
- super(TestConcreteBaseEventHandler, self).setUp()
+ super(TestConcreteBaseEventHandler, self).setup_method()
self.handler = ConcreteBaseEventHandler(
self.nodes, self.commands, batch_size=len(self.nodes),
batch_sleep=0.0, first_batch=self.nodes)
self.worker.eh = self.handler
@@ -226,7 +220,7 @@
def test_instantiation(self):
"""An instance of ConcreteBaseEventHandler should be an instance of
BaseEventHandler and initialize colorama."""
- self.assertListEqual(sorted(self.handler.nodes.keys()), self.nodes)
+ assert sorted(self.handler.nodes.keys()) == self.nodes
self.colorama.init.assert_called_once_with()
@mock.patch('cumin.transports.clustershell.tqdm')
@@ -238,19 +232,19 @@
self.worker.task.num_timeout.return_value = 1
self.worker.task.iter_keys_timeout.return_value = [self.nodes[0]]
- self.assertFalse(self.handler.global_timedout)
+ assert not self.handler.global_timedout
self.handler.on_timeout(self.worker.task)
- self.assertTrue(self.handler.pbar_ko.update.called)
- self.assertTrue(self.handler.global_timedout)
- self.assertTrue(tqdm.write.called)
+ assert self.handler.pbar_ko.update.called
+ assert self.handler.global_timedout
+ assert tqdm.write.called
def test_ev_pickup(self):
"""Calling ev_pickup() should set the state of the current node to
running."""
for node in self.nodes:
self.worker.current_node = node
self.handler.ev_pickup(self.worker)
- self.assertListEqual([node for node in
self.worker.eh.nodes.itervalues() if node.state.is_running],
- self.worker.eh.nodes.values())
+ running_nodes = [node for node in self.worker.eh.nodes.itervalues() if
node.state.is_running]
+ assert running_nodes == self.worker.eh.nodes.values()
@mock.patch('cumin.transports.clustershell.tqdm')
def test_ev_read_many_hosts(self, tqdm):
@@ -259,7 +253,7 @@
self.worker.current_node = node
self.worker.current_msg = 'Node output'
self.handler.ev_read(self.worker)
- self.assertFalse(tqdm.write.called)
+ assert not tqdm.write.called
@mock.patch('cumin.transports.clustershell.tqdm')
def test_ev_read_single_host(self, tqdm):
@@ -282,10 +276,10 @@
self.worker.current_node = node
self.handler.ev_pickup(self.worker)
- self.assertEqual(self.handler.counters['timeout'], 0)
+ assert self.handler.counters['timeout'] == 0
self.worker.task.num_timeout.return_value = 2
self.handler.ev_timeout(self.worker)
- self.assertEqual(self.handler.counters['timeout'], 2)
+ assert self.handler.counters['timeout'] == 2
class TestSyncEventHandler(TestBaseEventHandler):
@@ -294,9 +288,9 @@
@mock.patch('cumin.transports.clustershell.logging')
@mock.patch('cumin.transports.clustershell.colorama')
@mock.patch('cumin.transports.clustershell.tqdm')
- def setUp(self, tqdm, colorama, logger): # pylint:
disable=arguments-differ
+ def setup_method(self, _, tqdm, colorama, logger): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
- super(TestSyncEventHandler, self).setUp()
+ super(TestSyncEventHandler, self).setup_method()
self.handler = clustershell.SyncEventHandler(
self.nodes, self.commands, success_threshold=1,
batch_size=len(self.nodes), batch_sleep=0, logger=None,
first_batch=self.nodes)
@@ -307,14 +301,14 @@
def test_instantiation(self):
"""An instance of SyncEventHandler should be an instance of
BaseEventHandler."""
- self.assertIsInstance(self.handler, clustershell.BaseEventHandler)
+ assert isinstance(self.handler, clustershell.BaseEventHandler)
@mock.patch('cumin.transports.clustershell.tqdm')
def test_start_command_no_schedule(self, tqdm):
"""Calling start_command() should reset the success counter and
initialize the progress bars."""
self.handler.start_command()
- self.assertTrue(tqdm.called)
- self.assertEqual(self.handler.counters['success'], 0)
+ assert tqdm.called
+ assert self.handler.counters['success'] == 0
@mock.patch('cumin.transports.clustershell.Task.task_self')
@mock.patch('cumin.transports.clustershell.tqdm')
@@ -327,24 +321,24 @@
node.state.update(clustershell.State.pending)
self.handler.start_command(schedule=True)
- self.assertTrue(tqdm.called)
- self.assertEqual(self.handler.counters['success'], 0)
- self.assertListEqual(sorted(node.name for node in
self.handler.nodes.itervalues() if node.state.is_scheduled),
- sorted(['node1', 'node2']))
- self.assertTrue(task_self.called)
+ assert tqdm.called
+ assert self.handler.counters['success'] == 0
+ scheduled_nodes = sorted(node.name for node in
self.handler.nodes.itervalues() if node.state.is_scheduled)
+ assert scheduled_nodes == sorted(['node1', 'node2'])
+ assert task_self.called
@mock.patch('cumin.transports.clustershell.tqdm')
def test_end_command(self, tqdm):
"""Calling end_command() should wrap up the command execution."""
- self.assertFalse(self.handler.end_command())
+ assert not self.handler.end_command()
self.handler.counters['success'] = 2
- self.assertTrue(self.handler.end_command())
+ assert self.handler.end_command()
self.handler.kwargs['success_threshold'] = 0.5
self.handler.counters['success'] = 1
- self.assertTrue(self.handler.end_command())
+ assert self.handler.end_command()
self.handler.current_command_index = 1
- self.assertFalse(self.handler.end_command())
- self.assertTrue(tqdm.write.called)
+ assert not self.handler.end_command()
+ assert tqdm.write.called
@mock.patch('cumin.transports.clustershell.tqdm')
def test_on_timeout(self, tqdm):
@@ -352,7 +346,7 @@
self.worker.task.num_timeout.return_value = 0
self.worker.task.iter_keys_timeout.return_value = []
self.handler.on_timeout(self.worker.task)
- self.assertTrue(tqdm.write.called)
+ assert tqdm.write.called
def test_ev_timer(self):
"""Calling ev_timer() should schedule the execution of the next
node/command."""
@@ -365,9 +359,9 @@
self.worker.current_rc = 100
self.handler.ev_pickup(self.worker)
self.handler.ev_hup(self.worker)
- self.assertTrue(self.handler.pbar_ok.update.called)
- self.assertFalse(timer.called)
-
self.assertTrue(self.handler.nodes[self.worker.current_node].state.is_success)
+ assert self.handler.pbar_ok.update.called
+ assert not timer.called
+ assert self.handler.nodes[self.worker.current_node].state.is_success
@mock.patch('cumin.transports.clustershell.Task.Task.timer')
def test_ev_hup_ko(self, timer):
@@ -375,16 +369,16 @@
self.worker.current_rc = 1
self.handler.ev_pickup(self.worker)
self.handler.ev_hup(self.worker)
- self.assertTrue(self.handler.pbar_ko.update.called)
- self.assertFalse(timer.called)
-
self.assertTrue(self.handler.nodes[self.worker.current_node].state.is_failed)
+ assert self.handler.pbar_ko.update.called
+ assert not timer.called
+ assert self.handler.nodes[self.worker.current_node].state.is_failed
@mock.patch('cumin.transports.clustershell.tqdm')
def test_close(self, tqdm): # pylint: disable=arguments-differ
"""Calling close should print the report when needed."""
self.handler.current_command_index = 2
self.handler.close(self.worker)
- self.assertTrue(tqdm.write.called)
+ assert tqdm.write.called
class TestAsyncEventHandler(TestBaseEventHandler):
@@ -393,9 +387,9 @@
@mock.patch('cumin.transports.clustershell.logging')
@mock.patch('cumin.transports.clustershell.colorama')
@mock.patch('cumin.transports.clustershell.tqdm')
- def setUp(self, tqdm, colorama, logger): # pylint:
disable=arguments-differ
+ def setup_method(self, _, tqdm, colorama, logger): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
- super(TestAsyncEventHandler, self).setUp()
+ super(TestAsyncEventHandler, self).setup_method()
self.handler = clustershell.AsyncEventHandler(self.nodes,
self.commands)
self.worker.eh = self.handler
self.tqdm = tqdm
@@ -404,8 +398,8 @@
def test_instantiation(self):
"""An instance of AsyncEventHandler should be an instance of
BaseEventHandler and initialize progress bars."""
- self.assertIsInstance(self.handler, clustershell.BaseEventHandler)
- self.assertTrue(self.handler.pbar_ok.refresh.called)
+ assert isinstance(self.handler, clustershell.BaseEventHandler)
+ assert self.handler.pbar_ok.refresh.called
def test_ev_hup_ok(self):
"""Calling ev_hup with a worker that has zero exit status should
enqueue the next command."""
@@ -422,8 +416,8 @@
self.handler.ev_pickup(self.worker)
self.worker.current_rc = 0
self.handler.ev_hup(self.worker)
- self.assertEqual(self.handler.counters['success'], 1)
- self.assertTrue(self.handler.pbar_ok.update.called)
+ assert self.handler.counters['success'] == 1
+ assert self.handler.pbar_ok.update.called
def test_ev_hup_ko(self):
"""Calling ev_hup with a worker that has non-zero exit status should
not enqueue the next command."""
@@ -432,7 +426,7 @@
self.handler.ev_pickup(self.worker)
self.worker.current_rc = 1
self.handler.ev_hup(self.worker)
- self.assertTrue(self.handler.pbar_ko.update.called)
+ assert self.handler.pbar_ko.update.called
def test_ev_timer(self):
"""Calling ev_timer() should schedule the execution of the next
node/command."""
@@ -445,5 +439,5 @@
self.worker.task.iter_buffers = TestClusterShellWorker.iter_buffers
self.worker.num_timeout.return_value = 0
self.handler.close(self.worker)
- self.assertTrue(self.handler.pbar_ok.close.called)
- self.assertTrue(tqdm.write.called)
+ assert self.handler.pbar_ok.close.called
+ assert tqdm.write.called
diff --git a/cumin/tests/unit/transports/test_init.py
b/cumin/tests/unit/transports/test_init.py
index 5911c2e..970a969 100644
--- a/cumin/tests/unit/transports/test_init.py
+++ b/cumin/tests/unit/transports/test_init.py
@@ -1,8 +1,7 @@
"""Transport tests."""
-# pylint: disable=protected-access
-import unittest
-
+# pylint: disable=protected-access,no-self-use
import mock
+import pytest
import cumin # noqa: F401 (dynamically used in TestCommand)
@@ -30,11 +29,12 @@
self._handler = value
-class TestCommand(unittest.TestCase):
+class TestCommand(object):
"""Command class tests."""
- def setUp(self):
+ def setup_method(self, _):
"""Initialize test commands."""
+ # pylint: disable=attribute-defined-outside-init
command_with_options = r'command --with "options" -a -n -d params
with\ spaces'
self.same_command_a = transports.Command(command_with_options)
self.same_command_b = r"command --with 'options' -a -n -d params
with\ spaces"
@@ -62,83 +62,84 @@
def test_instantiation(self):
"""A new Command instance should set the command property to the given
command."""
for command in self.commands:
- self.assertIsInstance(command['obj'], transports.Command)
- self.assertEqual(command['obj'].command, command['command'])
- self.assertEqual(command['obj']._timeout, command.get('timeout',
None))
- self.assertEqual(command['obj']._ok_codes, command.get('ok_codes',
None))
+ assert isinstance(command['obj'], transports.Command)
+ assert command['obj'].command == command['command']
+ assert command['obj']._timeout == command.get('timeout', None)
+ assert command['obj']._ok_codes == command.get('ok_codes', None)
def test_repr(self):
"""A repr of a Command should allow to instantiate an instance with
the same properties."""
for command in self.commands:
# Bandit and pylint would require to use ast.literal_eval, but it
will not work with objects
command_instance = eval(repr(command['obj'])) # nosec pylint:
disable=eval-used
- self.assertIsInstance(command_instance, transports.Command)
- self.assertEqual(repr(command_instance), repr(command['obj']))
- self.assertEqual(command_instance.command, command['obj'].command)
- self.assertEqual(command_instance._timeout,
command['obj']._timeout)
- self.assertEqual(command_instance._ok_codes,
command['obj']._ok_codes)
+ assert isinstance(command_instance, transports.Command)
+ assert repr(command_instance) == repr(command['obj'])
+ assert command_instance.command == command['obj'].command
+ assert command_instance._timeout == command['obj']._timeout
+ assert command_instance._ok_codes == command['obj']._ok_codes
def test_str(self):
"""A cast to string of a Command should return its command."""
for command in self.commands:
- self.assertEqual(str(command['obj']), command['command'])
+ assert str(command['obj']) == command['command']
def test_eq(self):
"""A Command instance can be compared to another or to a string with
the equality operator."""
for command in self.commands:
- self.assertEqual(command['obj'], transports.Command(
- command['command'], timeout=command.get('timeout', None),
ok_codes=command.get('ok_codes', None)))
+ assert command['obj'] == transports.Command(
+ command['command'], timeout=command.get('timeout', None),
ok_codes=command.get('ok_codes', None))
if command.get('timeout', None) is None and
command.get('ok_codes', None) is None:
- self.assertEqual(command['obj'], command['command'])
+ assert command['obj'] == command['command']
- with self.assertRaisesRegexp(ValueError, 'Unable to compare
instance of'):
+ with pytest.raises(ValueError, match='Unable to compare instance
of'):
command['obj'] == 1 # pylint: disable=pointless-statement
- self.assertEqual(self.same_command_a, self.same_command_b)
+ assert self.same_command_a == self.same_command_b
def test_ne(self):
"""A Command instance can be compared to another or to a string with
the inequality operator."""
# Just a basic test, all the cases are covered by the test_eq test
for command in self.commands:
# Different command with same or differnt properties
- self.assertNotEqual(command['obj'], transports.Command(
- self.different_command, timeout=command.get('timeout', None),
ok_codes=command.get('ok_codes', None)))
- self.assertNotEqual(command['obj'], transports.Command(
- self.different_command, timeout=999,
ok_codes=command.get('ok_codes', None)))
- self.assertNotEqual(command['obj'], transports.Command(
- self.different_command, timeout=command.get('timeout', None),
ok_codes=[99]))
- self.assertNotEqual(command['obj'],
transports.Command(self.different_command, timeout=999, ok_codes=[99]))
- self.assertNotEqual(command['obj'], self.different_command)
+ assert command['obj'] != transports.Command(
+ self.different_command, timeout=command.get('timeout', None),
ok_codes=command.get('ok_codes', None))
+ assert command['obj'] != transports.Command(
+ self.different_command, timeout=999,
ok_codes=command.get('ok_codes', None))
+ assert command['obj'] != transports.Command(
+ self.different_command, timeout=command.get('timeout', None),
ok_codes=[99])
+ assert command['obj'] !=
transports.Command(self.different_command, timeout=999, ok_codes=[99])
+ assert command['obj'] != self.different_command
# Same command, properties different
- self.assertNotEqual(command['obj'], transports.Command(
- command['command'], timeout=999,
ok_codes=command.get('ok_codes', None)))
- self.assertNotEqual(command['obj'], transports.Command(
- command['command'], timeout=command.get('timeout', None),
ok_codes=[99]))
- self.assertNotEqual(command['obj'],
transports.Command(command['command'], timeout=999, ok_codes=[99]))
+ assert command['obj'] != transports.Command(
+ command['command'], timeout=999,
ok_codes=command.get('ok_codes', None))
+ assert command['obj'] != transports.Command(
+ command['command'], timeout=command.get('timeout', None),
ok_codes=[99])
+ assert command['obj'] != transports.Command(command['command'],
timeout=999, ok_codes=[99])
if command.get('timeout', None) is not None or
command.get('ok_codes', None) is not None:
- self.assertNotEqual(command['obj'], command['command'])
+ assert command['obj'] != command['command']
- with self.assertRaisesRegexp(ValueError, 'Unable to compare
instance of'):
+ with pytest.raises(ValueError, match='Unable to compare instance
of'):
command['obj'] == 1 # pylint: disable=pointless-statement
def test_timeout_getter(self):
"""Should return the timeout set, None otherwise."""
for command in self.commands:
- self.assertAlmostEqual(command['obj'].timeout,
command['obj']._timeout)
+ if command['obj'].timeout is not None and command['obj']._timeout
is not None:
+ assert command['obj'].timeout ==
pytest.approx(command['obj']._timeout)
def test_timeout_setter(self):
"""Should set the timeout to its value, converted to float if integer.
Unset it if None is passed."""
command = transports.Command('command1')
command.timeout = 1.0
- self.assertAlmostEqual(command._timeout, 1.0)
+ assert command._timeout == pytest.approx(1.0)
command.timeout = None
- self.assertIsNone(command._timeout)
+ assert command._timeout is None
command.timeout = 1
- self.assertAlmostEqual(command._timeout, 1.0)
- with self.assertRaisesRegexp(transports.WorkerError, 'timeout must be
a positive float'):
+ assert command._timeout == pytest.approx(1.0)
+ with pytest.raises(transports.WorkerError, match='timeout must be a
positive float'):
command.timeout = -1.0
def test_ok_codes_getter(self):
@@ -146,93 +147,91 @@
# Test empty list
command = transports.Command('command1')
command.ok_codes = []
- self.assertListEqual(command.ok_codes, [])
+ assert command.ok_codes == []
for command in self.commands:
- self.assertListEqual(command['obj'].ok_codes,
command.get('ok_codes', [0]))
+ assert command['obj'].ok_codes == command.get('ok_codes', [0])
def test_ok_codes_setter(self):
"""Should set the ok_codes to its value, unset it if None is passed."""
command = transports.Command('command1')
- self.assertIsNone(command._ok_codes)
+ assert command._ok_codes is None
for i in xrange(256):
codes = [i]
command.ok_codes = codes
- self.assertListEqual(command._ok_codes, codes)
+ assert command._ok_codes == codes
codes.insert(0, 0)
command.ok_codes = codes
- self.assertListEqual(command._ok_codes, codes)
+ assert command._ok_codes == codes
command.ok_codes = None
- self.assertIsNone(command._ok_codes)
+ assert command._ok_codes is None
command.ok_codes = []
- self.assertListEqual(command._ok_codes, [])
+ assert command._ok_codes == []
- with self.assertRaisesRegexp(transports.WorkerError, r'ok_codes must
be a list or None'):
+ with pytest.raises(transports.WorkerError, match='ok_codes must be a
list or None'):
command.ok_codes = 'invalid_value'
- message_regex = r'must be a list of integers in the range'
+ message = 'must be a list of integers in the range'
for i in (-1, 0.0, 100.0, 256, 'invalid_value'):
codes = [i]
- with self.assertRaisesRegexp(transports.WorkerError,
message_regex):
+ with pytest.raises(transports.WorkerError, match=message):
command.ok_codes = codes
codes.insert(0, 0)
- with self.assertRaisesRegexp(transports.WorkerError,
message_regex):
+ with pytest.raises(transports.WorkerError, match=message):
command.ok_codes = codes
-class TestState(unittest.TestCase):
+class TestState(object):
"""State class tests."""
def test_instantiation_no_init(self):
"""A new State without an init value should start in the pending
state."""
state = transports.State()
- self.assertEqual(state._state, transports.State.pending)
+ assert state._state == transports.State.pending
def test_instantiation_init_ok(self):
"""A new State with a valid init value should start in this state."""
state = transports.State(init=transports.State.running)
- self.assertEqual(state._state, transports.State.running)
+ assert state._state == transports.State.running
def test_instantiation_init_ko(self):
"""A new State with an invalid init value should raise
InvalidStateError."""
- with self.assertRaisesRegexp(transports.InvalidStateError, 'is not a
valid state'):
+ with pytest.raises(transports.InvalidStateError, match='is not a valid
state'):
transports.State(init='invalid_state')
def test_getattr_current(self):
"""Accessing the 'current' property should return the current state."""
- self.assertEqual(transports.State().current, transports.State.pending)
+ assert transports.State().current == transports.State.pending
def test_getattr_is_valid_state(self):
"""Accessing a property named is_{a_valid_state_name} should return a
boolean."""
state = transports.State(init=transports.State.failed)
- self.assertFalse(state.is_pending)
- self.assertFalse(state.is_scheduled)
- self.assertFalse(state.is_running)
- self.assertFalse(state.is_timeout)
- self.assertFalse(state.is_success)
- self.assertTrue(state.is_failed)
+ assert not state.is_pending
+ assert not state.is_scheduled
+ assert not state.is_running
+ assert not state.is_timeout
+ assert not state.is_success
+ assert state.is_failed
def test_getattr_invalid_property(self):
"""Accessing a property with an invalid name should raise
AttributeError."""
state = transports.State(init=transports.State.failed)
- with self.assertRaisesRegexp(AttributeError, 'object has no
attribute'):
+ with pytest.raises(AttributeError, match='object has no attribute'):
state.invalid_property # pylint: disable=pointless-statement
def test_repr(self):
"""A State repr should return its representation that allows to
recreate the same State instance."""
- self.assertEqual(
- repr(transports.State()),
'cumin.transports.State(init={state})'.format(state=transports.State.pending))
+ assert repr(transports.State()) ==
'cumin.transports.State(init={state})'.format(state=transports.State.pending)
state = transports.State.running
- self.assertEqual(
- repr(transports.State(init=state)),
'cumin.transports.State(init={state})'.format(state=state))
+ assert repr(transports.State(init=state)) ==
'cumin.transports.State(init={state})'.format(state=state)
def test_str(self):
"""A State string should return its string representation."""
- self.assertEqual(str(transports.State()), 'pending')
- self.assertEqual(str(transports.State(init=transports.State.running)),
'running')
+ assert str(transports.State()) == 'pending'
+ assert str(transports.State(init=transports.State.running)) ==
'running'
def test_cmp_state(self):
"""Two State instance can be compared between each other."""
@@ -240,14 +239,14 @@
greater_state = transports.State(init=transports.State.failed)
same_state = transports.State()
- self.assertGreater(greater_state, state)
- self.assertGreaterEqual(greater_state, state)
- self.assertGreaterEqual(same_state, state)
- self.assertLess(state, greater_state)
- self.assertLessEqual(state, greater_state)
- self.assertLessEqual(state, same_state)
- self.assertEqual(state, same_state)
- self.assertNotEqual(state, greater_state)
+ assert greater_state > state
+ assert greater_state >= state
+ assert same_state >= state
+ assert state < greater_state
+ assert state <= greater_state
+ assert state <= same_state
+ assert state == same_state
+ assert state != greater_state
def test_cmp_int(self):
"""A State instance can be compared with integers."""
@@ -255,56 +254,56 @@
greater_state = transports.State.running
same_state = transports.State.pending
- self.assertGreater(greater_state, state)
- self.assertGreaterEqual(greater_state, state)
- self.assertGreaterEqual(same_state, state)
- self.assertLess(state, greater_state)
- self.assertLessEqual(state, greater_state)
- self.assertLessEqual(state, same_state)
- self.assertEqual(state, same_state)
- self.assertNotEqual(state, greater_state)
+ assert greater_state > state
+ assert greater_state >= state
+ assert same_state >= state
+ assert state < greater_state
+ assert state <= greater_state
+ assert state <= same_state
+ assert state == same_state
+ assert state != greater_state
def test_cmp_invalid(self):
"""Trying to compare a State instance with an invalid object should
raise ValueError."""
state = transports.State()
invalid_state = 'invalid_state'
- with self.assertRaisesRegexp(ValueError, 'Unable to compare instance'):
- self.assertEqual(state, invalid_state)
+ with pytest.raises(ValueError, match='Unable to compare instance'):
+ state == invalid_state # pylint: disable=pointless-statement
def test_update_invalid_state(self):
"""Trying to update a State with an invalid value should raise
ValueError."""
state = transports.State()
- with self.assertRaisesRegexp(ValueError, 'State must be one of'):
+ with pytest.raises(ValueError, match='State must be one of'):
state.update('invalid_state')
def test_update_invalid_transition(self):
"""Trying to update a State with an invalid transition should raise
StateTransitionError."""
state = transports.State()
- with self.assertRaisesRegexp(transports.StateTransitionError, 'the
allowed states are'):
+ with pytest.raises(transports.StateTransitionError, match='the allowed
states are'):
state.update(transports.State.failed)
def test_update_ok(self):
"""Properly updating a State should update it without errors."""
state = transports.State()
state.update(transports.State.scheduled)
- self.assertEqual(state.current, transports.State.scheduled)
+ assert state.current == transports.State.scheduled
state.update(transports.State.running)
- self.assertEqual(state.current, transports.State.running)
+ assert state.current == transports.State.running
state.update(transports.State.success)
- self.assertEqual(state.current, transports.State.success)
+ assert state.current == transports.State.success
state.update(transports.State.pending)
- self.assertEqual(state.current, transports.State.pending)
+ assert state.current == transports.State.pending
-class TestBaseWorker(unittest.TestCase):
+class TestBaseWorker(object):
"""Concrete BaseWorker class for tests."""
def test_instantiation(self):
"""Raise if instantiated directly, should return an instance of
BaseWorker if inherited."""
- with self.assertRaises(TypeError):
+ with pytest.raises(TypeError):
transports.BaseWorker({}) # pylint:
disable=abstract-class-instantiated
- self.assertIsInstance(ConcreteBaseWorker({}), transports.BaseWorker)
+ assert isinstance(ConcreteBaseWorker({}), transports.BaseWorker)
@mock.patch.dict(transports.os.environ, {}, clear=True)
def test_init(self):
@@ -313,141 +312,142 @@
config = {'transport': 'test_transport',
'environment': env_dict}
- self.assertEqual(transports.os.environ, {})
+ assert transports.os.environ == {}
worker = ConcreteBaseWorker(config)
- self.assertEqual(transports.os.environ, env_dict)
- self.assertEqual(worker.config, config)
+ assert transports.os.environ == env_dict
+ assert worker.config == config
-class TestConcreteBaseWorker(unittest.TestCase):
+class TestConcreteBaseWorker(object):
"""BaseWorker test class."""
- def setUp(self):
+ def setup_method(self, _):
"""Initialize default properties and instances."""
+ # pylint: disable=attribute-defined-outside-init
self.worker = ConcreteBaseWorker({})
self.hosts = ['node1', 'node2']
self.commands = [transports.Command('command1'),
transports.Command('command2')]
def test_hosts_getter(self):
"""Access to hosts property should return an empty list if not set and
the list of hosts otherwise."""
- self.assertListEqual(self.worker.hosts, [])
+ assert self.worker.hosts == []
self.worker._hosts = self.hosts
- self.assertListEqual(self.worker.hosts, self.hosts)
+ assert self.worker.hosts == self.hosts
def test_hosts_setter(self):
"""Raise WorkerError if trying to set it not to an iterable, set it
otherwise."""
- with self.assertRaisesRegexp(transports.WorkerError, r'hosts must be a
list'):
+ with pytest.raises(transports.WorkerError, match='hosts must be a
list'):
self.worker.hosts = 'not-list'
self.worker.hosts = self.hosts
- self.assertListEqual(self.worker._hosts, self.hosts)
+ assert self.worker._hosts == self.hosts
def test_commands_getter(self):
"""Access to commands property should return an empty list if not set
and the list of commands otherwise."""
- self.assertListEqual(self.worker.commands, [])
+ assert self.worker.commands == []
self.worker._commands = self.commands
- self.assertListEqual(self.worker.commands, self.commands)
+ assert self.worker.commands == self.commands
self.worker._commands = None
- self.assertListEqual(self.worker.commands, [])
+ assert self.worker.commands == []
def test_commands_setter(self):
"""Raise WorkerError if trying to set it not to a list, set it
otherwise."""
- with self.assertRaisesRegexp(transports.WorkerError, r'commands must
be a list'):
+ with pytest.raises(transports.WorkerError, match='commands must be a
list'):
self.worker.commands = 'invalid_value'
- with self.assertRaisesRegexp(transports.WorkerError, r'commands must
be a list of Command objects or strings'):
+ with pytest.raises(transports.WorkerError, match='commands must be a
list of Command objects'):
self.worker.commands = [1, 'command2']
self.worker.commands = self.commands
- self.assertListEqual(self.worker._commands, self.commands)
+ assert self.worker._commands == self.commands
self.worker.commands = None
- self.assertIsNone(self.worker._commands)
+ assert self.worker._commands is None
self.worker.commands = ['command1', 'command2']
- self.assertListEqual(self.worker._commands, self.commands)
+ assert self.worker._commands == self.commands
def test_timeout_getter(self):
"""Return default value if not set, the value otherwise."""
- self.assertEqual(self.worker.timeout, 0)
+ assert self.worker.timeout == 0
self.worker._timeout = 10
- self.assertEqual(self.worker.timeout, 10)
+ assert self.worker.timeout == 10
def test_timeout_setter(self):
"""Raise WorkerError if not a positive integer, set it otherwise."""
message = r'timeout must be a positive integer'
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.timeout = -1
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.timeout = 0
self.worker.timeout = 10
- self.assertEqual(self.worker._timeout, 10)
+ assert self.worker._timeout == 10
self.worker.timeout = None
- self.assertEqual(self.worker._timeout, None)
+ assert self.worker._timeout is None
def test_success_threshold_getter(self):
"""Return default value if not set, the value otherwise."""
- self.assertAlmostEqual(self.worker.success_threshold, 1.0)
+ assert self.worker.success_threshold == pytest.approx(1.0)
for success_threshold in (0.0, 0.0001, 0.5, 0.99):
self.worker._success_threshold = success_threshold
- self.assertAlmostEqual(self.worker.success_threshold,
success_threshold)
+ assert self.worker.success_threshold ==
pytest.approx(success_threshold)
def test_success_threshold_setter(self):
"""Raise WorkerError if not float between 0 and 1, set it otherwise."""
message = r'success_threshold must be a float beween 0 and 1'
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.success_threshold = 1
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.success_threshold = -0.1
self.worker.success_threshold = 0.3
- self.assertAlmostEqual(self.worker._success_threshold, 0.3)
+ assert self.worker._success_threshold == pytest.approx(0.3)
def test_batch_size_getter(self):
"""Return default value if not set, the value otherwise."""
self.worker.hosts = self.hosts
- self.assertEqual(self.worker.batch_size, len(self.hosts))
+ assert self.worker.batch_size == len(self.hosts)
self.worker._batch_size = 1
- self.assertEqual(self.worker.batch_size, 1)
+ assert self.worker.batch_size == 1
def test_batch_size_setter(self):
"""Raise WorkerError if not positive integer, set it otherwise forcing
it to len(hosts) if greater."""
- with self.assertRaisesRegexp(transports.WorkerError, r'batch_size must
be a positive integer'):
+ with pytest.raises(transports.WorkerError, match='batch_size must be a
positive integer'):
self.worker.batch_size = -1
- with self.assertRaisesRegexp(transports.WorkerError, r'batch_size must
be a positive integer'):
+ with pytest.raises(transports.WorkerError, match='batch_size must be a
positive integer'):
self.worker.batch_size = 0
self.worker.hosts = self.hosts
self.worker.batch_size = 10
- self.assertEqual(self.worker._batch_size, len(self.hosts))
+ assert self.worker._batch_size == len(self.hosts)
self.worker.batch_size = 1
- self.assertEqual(self.worker._batch_size, 1)
+ assert self.worker._batch_size == 1
def test_batch_sleep_getter(self):
"""Return default value if not set, the value otherwise."""
- self.assertAlmostEqual(self.worker.batch_sleep, 0.0)
+ assert self.worker.batch_sleep == pytest.approx(0.0)
self.worker._batch_sleep = 10.0
- self.assertAlmostEqual(self.worker.batch_sleep, 10.0)
+ assert self.worker.batch_sleep == pytest.approx(10.0)
def test_batch_sleep_setter(self):
"""Raise WorkerError if not positive integer, set it otherwise."""
message = r'batch_sleep must be a positive float'
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.batch_sleep = 1
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.batch_sleep = '1'
- with self.assertRaisesRegexp(transports.WorkerError, message):
+ with pytest.raises(transports.WorkerError, match=message):
self.worker.batch_sleep = -1.0
self.worker.batch_sleep = 10.0
- self.assertAlmostEqual(self.worker._batch_sleep, 10.0)
+ assert self.worker._batch_sleep == pytest.approx(10.0)
-class TestModuleFunctions(unittest.TestCase):
+class TestModuleFunctions(object):
"""Transports module functions test class."""
def test_validate_list(self):
@@ -458,9 +458,9 @@
transports.validate_list('Test', ['value1', 'value2'])
message = r'Test must be a list'
- func = transports.validate_list
for invalid_value in (0, 'invalid_value', {'invalid': 'value'}):
- self.assertRaisesRegexp(transports.WorkerError, message, func,
'Test', invalid_value)
+ with pytest.raises(transports.WorkerError, match=message):
+ transports.validate_list('Test', invalid_value)
def test_validate_positive_integer(self):
"""Should raise a WorkerError if the argument is not a positive
integer or None."""
@@ -469,11 +469,11 @@
transports.validate_positive_integer('Test', 100)
message = r'Test must be a positive integer'
- func = transports.validate_positive_integer
for invalid_value in (0, -1, 'invalid_value', ['invalid_value']):
- self.assertRaisesRegexp(transports.WorkerError, message, func,
'Test', invalid_value)
+ with pytest.raises(transports.WorkerError, match=message):
+ transports.validate_positive_integer('Test', invalid_value)
def test_raise_error(self):
"""Should raise a WorkerError."""
- with self.assertRaisesRegexp(transports.WorkerError, 'Test message'):
+ with pytest.raises(transports.WorkerError, match='Test message'):
transports.raise_error('Test', 'message', 'value')
--
To view, visit https://gerrit.wikimedia.org/r/361274
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ia75f32b212e36d613c784765a52f1317ac690810
Gerrit-PatchSet: 4
Gerrit-Project: operations/software/cumin
Gerrit-Branch: master
Gerrit-Owner: Volans <[email protected]>
Gerrit-Reviewer: Faidon Liambotis <[email protected]>
Gerrit-Reviewer: Gehel <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: Volans <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits