jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/367825 )
Change subject: Transports: improve target management
......................................................................
Transports: improve target management
Breaking change in the API.
* add a Target class to handle all the target-related configuration
* let the BaseWorker require an instance of the Target class and
delegate to it for all the target-related configuration.
* This changes the BaseWorker constructor signature and removes the
hosts, batch_size and batch_sleep setter/getter.
Bug: T171684
Change-Id: Ifd01a6f8ff3d6a5d7b02505599932a87e099dd90
---
M cumin/cli.py
M cumin/tests/unit/transports/test_clustershell.py
M cumin/tests/unit/transports/test_init.py
M cumin/tests/vulture_whitelist.py
M cumin/transports/__init__.py
M cumin/transports/clustershell.py
6 files changed, 220 insertions(+), 201 deletions(-)
Approvals:
jenkins-bot: Verified
Gehel: Looks good to me, but someone else must approve
Volans: Looks good to me, approved
diff --git a/cumin/cli.py b/cumin/cli.py
index 9571c6c..51e1b5c 100644
--- a/cumin/cli.py
+++ b/cumin/cli.py
@@ -314,7 +314,7 @@
return 0
worker = transport.Transport.new(config, logger)
- worker.hosts = hosts
+ worker.target = transports.Target(hosts, batch_size=args.batch_size,
batch_sleep=args.batch_sleep, logger=logger)
ok_codes = None
if args.ignore_exit_codes:
@@ -325,8 +325,6 @@
worker.timeout = args.global_timeout
worker.handler = args.mode
worker.success_threshold = args.success_percentage / float(100)
- worker.batch_size = args.batch_size
- worker.batch_sleep = args.batch_sleep
exit_code = worker.execute()
if args.interactive:
diff --git a/cumin/tests/unit/transports/test_clustershell.py
b/cumin/tests/unit/transports/test_clustershell.py
index 12d63d8..647263d 100644
--- a/cumin/tests/unit/transports/test_clustershell.py
+++ b/cumin/tests/unit/transports/test_clustershell.py
@@ -4,7 +4,9 @@
import mock
import pytest
-from cumin.transports import BaseWorker, Command, clustershell, State,
WorkerError
+from ClusterShell.NodeSet import NodeSet
+
+from cumin.transports import BaseWorker, Command, clustershell, State, Target,
WorkerError
def test_node_class_instantiation():
@@ -17,7 +19,7 @@
@mock.patch('cumin.transports.clustershell.Task.task_self')
def test_worker_class(task_self):
"""An instance of worker_class should be an instance of BaseWorker."""
- worker = clustershell.worker_class({})
+ worker = clustershell.worker_class({}, Target(NodeSet('node1')))
assert isinstance(worker, BaseWorker)
task_self.assert_called_once_with()
@@ -33,8 +35,8 @@
'ssh_options': ['-o StrictHostKeyChecking=no', '-o
BatchMode=yes'],
'fanout': 3}}
- self.worker = clustershell.worker_class(self.config)
- self.nodes = clustershell.NodeSet.NodeSet('node[1-2]')
+ self.target = Target(NodeSet('node[1-2]'))
+ self.worker = clustershell.worker_class(self.config, self.target)
self.commands = [Command('command1'), Command('command2', ok_codes=[0,
100], timeout=5)]
self.task_self = task_self
# Mock default handlers
@@ -43,13 +45,12 @@
'async': mock.MagicMock(spec_set=clustershell.AsyncEventHandler)}
# Initialize the worker
- self.worker.hosts = self.nodes
self.worker.commands = self.commands
@mock.patch('cumin.transports.clustershell.Task.task_self')
def test_instantiation(self, task_self):
"""An instance of ClusterShellWorker should be an instance of
BaseWorker and initialize ClusterShell."""
- worker = clustershell.ClusterShellWorker(self.config)
+ worker = clustershell.ClusterShellWorker(self.config, self.target)
assert isinstance(worker, BaseWorker)
task_self.assert_called_once_with()
worker.task.set_info.assert_has_calls(
@@ -60,16 +61,20 @@
"""Calling execute() in sync mode without event handler should use the
default sync event handler."""
self.worker.handler = 'sync'
self.worker.execute()
- self.worker.task.shell.assert_called_once_with(
- 'command1', nodes=self.nodes,
handler=self.worker._handler_instance, timeout=None)
+ args, kwargs = self.worker.task.shell.call_args
+ assert args == ('command1',)
+ assert kwargs['nodes'] == self.target.first_batch
+ assert kwargs['handler'] == self.worker._handler_instance
assert clustershell.DEFAULT_HANDLERS['sync'].called
def test_execute_default_async_handler(self):
"""Calling execute() in async mode without event handler should use
the default async event handler."""
self.worker.handler = 'async'
self.worker.execute()
- self.worker.task.shell.assert_called_once_with(
- 'command1', nodes=self.nodes,
handler=self.worker._handler_instance, timeout=None)
+ args, kwargs = self.worker.task.shell.call_args
+ assert args == ('command1',)
+ assert kwargs['nodes'] == self.target.first_batch
+ assert kwargs['handler'] == self.worker._handler_instance
assert clustershell.DEFAULT_HANDLERS['async'].called
def test_execute_timeout(self):
@@ -84,8 +89,10 @@
self.worker.handler = ConcreteBaseEventHandler
self.worker.execute()
assert isinstance(self.worker._handler_instance,
ConcreteBaseEventHandler)
- self.worker.task.shell.assert_called_once_with(
- 'command1', nodes=self.nodes,
handler=self.worker._handler_instance, timeout=None)
+ args, kwargs = self.worker.task.shell.call_args
+ assert args == ('command1',)
+ assert kwargs['nodes'] == self.target.first_batch
+ assert kwargs['handler'] == self.worker._handler_instance
def test_execute_no_commands(self):
"""Calling execute() without commands should raise WorkerError."""
@@ -110,8 +117,10 @@
self.worker.handler = 'sync'
self.worker.batch_size = 1
self.worker.execute()
- self.worker.task.shell.assert_called_once_with(
- 'command1', nodes=self.nodes[:1],
handler=self.worker._handler_instance, timeout=None)
+ args, kwargs = self.worker.task.shell.call_args
+ assert args == ('command1',)
+ assert kwargs['nodes'] == self.target.first_batch
+ assert kwargs['handler'] == self.worker._handler_instance
def test_get_results(self):
"""Calling get_results() should call ClusterShell iter_buffers with
the right parameters."""
@@ -171,19 +180,19 @@
def setup_method(self, *args): # pylint: disable=arguments-differ
"""Initialize default properties and instances."""
- self.nodes = clustershell.NodeSet.NodeSet('node[1-2]')
+ self.target = Target(NodeSet('node[1-2]'))
self.commands = [Command('command1', ok_codes=[0, 100]),
Command('command2', timeout=5)]
self.worker = mock.MagicMock()
self.worker.current_node = 'node1'
self.worker.command = 'command1'
- self.worker.nodes = self.nodes
+ self.worker.nodes = self.target.hosts
self.handler = None
self.args = args
@mock.patch('cumin.transports.clustershell.colorama')
def test_close(self, colorama):
"""Calling close should raise NotImplementedError."""
- self.handler = clustershell.BaseEventHandler(self.nodes, self.commands)
+ self.handler = clustershell.BaseEventHandler(self.target,
self.commands)
with pytest.raises(NotImplementedError):
self.handler.close(self.worker)
colorama.init.assert_called_once_with()
@@ -210,25 +219,24 @@
def setup_method(self, _, tqdm, colorama): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
super(TestConcreteBaseEventHandler, self).setup_method()
- self.handler = ConcreteBaseEventHandler(
- self.nodes, self.commands, batch_size=len(self.nodes),
batch_sleep=0.0, first_batch=self.nodes)
+ self.handler = ConcreteBaseEventHandler(self.target, self.commands)
self.worker.eh = self.handler
self.colorama = colorama
assert not tqdm.write.called
def test_instantiation(self):
"""An instance of ConcreteBaseEventHandler should be an instance of
BaseEventHandler and initialize colorama."""
- assert sorted(self.handler.nodes.keys()) == list(self.nodes)
+ assert sorted(self.handler.nodes.keys()) == list(self.target.hosts)
self.colorama.init.assert_called_once_with()
@mock.patch('cumin.transports.clustershell.tqdm')
def test_on_timeout(self, tqdm):
"""Calling on_timeout() should update the fail progress bar."""
- for node in self.nodes:
+ for node in self.target.hosts:
self.worker.current_node = node
self.handler.ev_pickup(self.worker)
self.worker.task.num_timeout.return_value = 1
- self.worker.task.iter_keys_timeout.return_value = [self.nodes[0]]
+ self.worker.task.iter_keys_timeout.return_value =
[self.target.hosts[0]]
assert not self.handler.global_timedout
self.handler.on_timeout(self.worker.task)
@@ -238,7 +246,7 @@
def test_ev_pickup(self):
"""Calling ev_pickup() should set the state of the current node to
running."""
- for node in self.nodes:
+ for node in self.target.hosts:
self.worker.current_node = node
self.handler.ev_pickup(self.worker)
running_nodes = [node for node in self.worker.eh.nodes.itervalues() if
node.state.is_running]
@@ -247,7 +255,7 @@
@mock.patch('cumin.transports.clustershell.tqdm')
def test_ev_read_many_hosts(self, tqdm):
"""Calling ev_read() should not print the worker message if matching
multiple hosts."""
- for node in self.nodes:
+ for node in self.target.hosts:
self.worker.current_node = node
self.worker.current_msg = 'Node output'
self.handler.ev_read(self.worker)
@@ -256,20 +264,19 @@
@mock.patch('cumin.transports.clustershell.tqdm')
def test_ev_read_single_host(self, tqdm):
"""Calling ev_read() should print the worker message if matching a
single host."""
- self.nodes = clustershell.NodeSet.NodeSet('node1')
- self.handler = ConcreteBaseEventHandler(
- self.nodes, self.commands, batch_size=len(self.nodes),
batch_sleep=0.0, first_batch=self.nodes)
+ self.target = Target(NodeSet('node1'))
+ self.handler = ConcreteBaseEventHandler(self.target, self.commands)
output = 'node1 output'
- self.worker.nodes = self.nodes
- self.worker.current_node = self.nodes[0]
+ self.worker.nodes = self.target.hosts
+ self.worker.current_node = self.target.hosts[0]
self.worker.current_msg = output
self.handler.ev_read(self.worker)
tqdm.write.assert_has_calls([mock.call(output)])
def test_ev_timeout(self):
"""Calling ev_timeout() should increase the counters for the timed out
hosts."""
- for node in self.nodes:
+ for node in self.target.hosts:
self.worker.current_node = node
self.handler.ev_pickup(self.worker)
@@ -288,9 +295,7 @@
def setup_method(self, _, tqdm, colorama, logger): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
super(TestSyncEventHandler, self).setup_method()
- self.handler = clustershell.SyncEventHandler(
- self.nodes, self.commands, success_threshold=1,
batch_size=len(self.nodes), batch_sleep=0, logger=None,
- first_batch=self.nodes)
+ self.handler = clustershell.SyncEventHandler(self.target,
self.commands, success_threshold=1)
self.worker.eh = self.handler
self.colorama = colorama
self.logger = logger
@@ -330,7 +335,7 @@
assert not self.handler.end_command()
self.handler.counters['success'] = 2
assert self.handler.end_command()
- self.handler.kwargs['success_threshold'] = 0.5
+ self.handler.success_threshold = 0.5
self.handler.counters['success'] = 1
assert self.handler.end_command()
self.handler.current_command_index = 1
@@ -387,7 +392,7 @@
def setup_method(self, _, tqdm, colorama, logger): # pylint:
disable=arguments-differ
"""Initialize default properties and instances."""
super(TestAsyncEventHandler, self).setup_method()
- self.handler = clustershell.AsyncEventHandler(self.nodes,
self.commands)
+ self.handler = clustershell.AsyncEventHandler(self.target,
self.commands)
self.worker.eh = self.handler
self.colorama = colorama
self.logger = logger
@@ -400,8 +405,6 @@
def test_ev_hup_ok(self):
"""Calling ev_hup with a worker that has zero exit status should
enqueue the next command."""
- for node in self.handler.nodes.itervalues():
- node.state.update(State.scheduled)
self.handler.ev_pickup(self.worker)
self.worker.current_rc = 0
self.handler.ev_hup(self.worker)
@@ -418,8 +421,6 @@
def test_ev_hup_ko(self):
"""Calling ev_hup with a worker that has non-zero exit status should
not enqueue the next command."""
- for node in self.handler.nodes.itervalues():
- node.state.update(State.scheduled)
self.handler.ev_pickup(self.worker)
self.worker.current_rc = 1
self.handler.ev_hup(self.worker)
diff --git a/cumin/tests/unit/transports/test_init.py
b/cumin/tests/unit/transports/test_init.py
index 6e78929..8f75535 100644
--- a/cumin/tests/unit/transports/test_init.py
+++ b/cumin/tests/unit/transports/test_init.py
@@ -307,15 +307,83 @@
assert state.current == transports.State.pending
+class TestTarget(object):
+ """Target class tests."""
+
+ def setup_method(self, _):
+ """Initialize default properties and instances."""
+ # pylint: disable=attribute-defined-outside-init
+ self.hosts_list = ['host' + str(i) for i in xrange(10)]
+ self.hosts = NodeSet.fromlist(self.hosts_list)
+
+ def test_instantiation_nodeset(self):
+ """Creating a Target instance with a NodeSet and without optional
parameter should return their defaults."""
+ target = transports.Target(self.hosts)
+ assert target.hosts == self.hosts
+ assert target.batch_size == len(self.hosts)
+ assert target.batch_sleep == 0.0
+
+ def test_instantiation_list(self):
+ """Creating a Target instance with a list and without optional
parameter should return their defaults."""
+ target = transports.Target(self.hosts_list)
+ assert target.hosts == self.hosts
+ assert target.batch_size == len(self.hosts)
+ assert target.batch_sleep == 0.0
+
+ def test_instantiation_invalid(self):
+ """Creating a Target instance with invalid hosts should raise
WorkerError."""
+ with pytest.raises(transports.WorkerError, match="must be a
ClusterShell's NodeSet or a list"):
+ transports.Target(set(self.hosts_list))
+
+ @mock.patch('cumin.transports.logging.Logger.debug')
+ def test_instantiation_batch_size(self, mocked_logger):
+ """Creating a Target instance with a batch_size should set it to it's
value, if valid."""
+ target = transports.Target(self.hosts, batch_size=5)
+ assert target.batch_size == 5
+
+ target = transports.Target(self.hosts, batch_size=len(self.hosts) + 1)
+ assert target.batch_size == len(self.hosts)
+ assert mocked_logger.called
+
+ target = transports.Target(self.hosts, batch_size=None)
+ assert target.batch_size == len(self.hosts)
+
+ with pytest.raises(transports.WorkerError):
+ transports.Target(self.hosts, batch_size=0)
+
+ def test_instantiation_batch_sleep(self):
+ """Creating a Target instance with a batch_sleep should set it to it's
value, if valid."""
+ target = transports.Target(self.hosts, batch_sleep=5.0)
+ assert target.batch_sleep == pytest.approx(5.0)
+
+ target = transports.Target(self.hosts, batch_sleep=None)
+ assert target.batch_sleep == pytest.approx(0.0)
+
+ with pytest.raises(transports.WorkerError):
+ transports.Target(self.hosts, batch_sleep=0)
+
+ with pytest.raises(transports.WorkerError):
+ transports.Target(self.hosts, batch_sleep=-1.0)
+
+ def test_first_batch(self):
+ """The first_batch property should return the first_batch of hosts."""
+ size = 5
+ target = transports.Target(self.hosts, batch_size=size)
+ assert len(target.first_batch) == size
+ assert target.first_batch == NodeSet.fromlist(self.hosts[:size])
+ assert isinstance(target.first_batch, NodeSet)
+
+
class TestBaseWorker(object):
"""Concrete BaseWorker class for tests."""
def test_instantiation(self):
"""Raise if instantiated directly, should return an instance of
BaseWorker if inherited."""
+ target = transports.Target(NodeSet('node1'))
with pytest.raises(TypeError):
- transports.BaseWorker({}) # pylint:
disable=abstract-class-instantiated
+ transports.BaseWorker({}, target) # pylint:
disable=abstract-class-instantiated
- assert isinstance(ConcreteBaseWorker({}), transports.BaseWorker)
+ assert isinstance(ConcreteBaseWorker({},
transports.Target(NodeSet('node[1-2]'))), transports.BaseWorker)
@mock.patch.dict(transports.os.environ, {}, clear=True)
def test_init(self):
@@ -325,7 +393,7 @@
'environment': env_dict}
assert transports.os.environ == {}
- worker = ConcreteBaseWorker(config)
+ worker = ConcreteBaseWorker(config,
transports.Target(NodeSet('node[1-2]')))
assert transports.os.environ == env_dict
assert worker.config == config
@@ -336,23 +404,8 @@
def setup_method(self, _):
"""Initialize default properties and instances."""
# pylint: disable=attribute-defined-outside-init
- self.worker = ConcreteBaseWorker({})
- self.hosts = NodeSet('node[1-2]')
+ self.worker = ConcreteBaseWorker({},
transports.Target(NodeSet('node[1-2]')))
self.commands = [transports.Command('command1'),
transports.Command('command2')]
-
- def test_hosts_getter(self):
- """Access to hosts property should return an empty list if not set and
the list of hosts otherwise."""
- assert self.worker.hosts == []
- self.worker._hosts = self.hosts
- assert self.worker.hosts == self.hosts
-
- def test_hosts_setter(self):
- """Raise WorkerError if trying to set it not to an iterable, set it
otherwise."""
- with pytest.raises(transports.WorkerError, match="must be an instance
of ClusterShell's NodeSet or None"):
- self.worker.hosts = 'not-list'
-
- self.worker.hosts = self.hosts
- assert self.worker._hosts == self.hosts
def test_commands_getter(self):
"""Access to commands property should return an empty list if not set
and the list of commands otherwise."""
@@ -415,48 +468,6 @@
self.worker.success_threshold = 0.3
assert self.worker._success_threshold == pytest.approx(0.3)
-
- def test_batch_size_getter(self):
- """Return default value if not set, the value otherwise."""
- self.worker.hosts = self.hosts
- assert self.worker.batch_size == len(self.hosts)
- self.worker._batch_size = 1
- assert self.worker.batch_size == 1
-
- def test_batch_size_setter(self):
- """Raise WorkerError if not positive integer, set it otherwise forcing
it to len(hosts) if greater."""
- with pytest.raises(transports.WorkerError, match='batch_size must be a
positive integer'):
- self.worker.batch_size = -1
-
- with pytest.raises(transports.WorkerError, match='batch_size must be a
positive integer'):
- self.worker.batch_size = 0
-
- self.worker.hosts = self.hosts
- self.worker.batch_size = 10
- assert self.worker._batch_size == len(self.hosts)
- self.worker.batch_size = 1
- assert self.worker._batch_size == 1
-
- def test_batch_sleep_getter(self):
- """Return default value if not set, the value otherwise."""
- assert self.worker.batch_sleep == pytest.approx(0.0)
- self.worker._batch_sleep = 10.0
- assert self.worker.batch_sleep == pytest.approx(10.0)
-
- def test_batch_sleep_setter(self):
- """Raise WorkerError if not positive integer, set it otherwise."""
- message = r'batch_sleep must be a positive float'
- with pytest.raises(transports.WorkerError, match=message):
- self.worker.batch_sleep = 1
-
- with pytest.raises(transports.WorkerError, match=message):
- self.worker.batch_sleep = '1'
-
- with pytest.raises(transports.WorkerError, match=message):
- self.worker.batch_sleep = -1.0
-
- self.worker.batch_sleep = 10.0
- assert self.worker._batch_sleep == pytest.approx(10.0)
class TestModuleFunctions(object):
diff --git a/cumin/tests/vulture_whitelist.py b/cumin/tests/vulture_whitelist.py
index 255aca2..5613903 100644
--- a/cumin/tests/vulture_whitelist.py
+++ b/cumin/tests/vulture_whitelist.py
@@ -15,6 +15,9 @@
whitelist_cli = Whitelist()
whitelist_cli.run.h
+whitelist_transports_clustershell = Whitelist()
+whitelist_transports_clustershell.BaseEventHandler.kwargs
+
whitelist_tests_integration_conftest = Whitelist()
whitelist_tests_integration_conftest.pytest_cmdline_preparse
whitelist_tests_integration_conftest.pytest_runtest_makereport
diff --git a/cumin/transports/__init__.py b/cumin/transports/__init__.py
index b664492..50d3a15 100644
--- a/cumin/transports/__init__.py
+++ b/cumin/transports/__init__.py
@@ -230,31 +230,94 @@
self._state = new
+class Target(object):
+ """Targets management class."""
+
+ def __init__(self, hosts, batch_size=None, batch_sleep=None, logger=None):
+ """Constructor, inizialize the Target with the list of hosts and
additional parameters.
+
+ Arguments:
+ hosts -- a ClusterShell's NodeSet or a list of hosts that will
be targeted
+ batch_size -- set the batch size so that no more that this number of
hosts are targeted at any given time.
+ If greater than the number of hosts it will be
auto-resized to the number of hosts. It must be
+ a positive integer or None to unset it. [optional,
default: None]
+ batch_sleep -- sleep time in seconds between the end of execution of
one host in the batch and the start in
+ the next host. It must be a positive float or None to
unset it. [optional, default: None]
+ logger -- a logging.Logger instance [optional, default: None]
+ """
+ self.logger = logger or logging.getLogger(__name__)
+
+ if isinstance(hosts, NodeSet):
+ self.hosts = hosts
+ elif isinstance(hosts, list):
+ self.hosts = NodeSet.fromlist(hosts)
+ else:
+ raise_error('hosts', "must be a ClusterShell's NodeSet or a list",
hosts)
+
+ self.batch_size = self._compute_batch_size(batch_size, self.hosts)
+ self.batch_sleep = Target._compute_batch_sleep(batch_sleep)
+
+ @property
+ def first_batch(self):
+ """Extract the first batch of hosts to execute."""
+ return self.hosts[:self.batch_size]
+
+ def _compute_batch_size(self, batch_size, hosts):
+ """Compute the batch_size based on the hosts size and return the value
to be used.
+
+ Arguments:
+ batch_size -- a positive integer to indicate the batch_size to apply
when executing the worker or None to get
+ its default value. If greater than the number of hosts,
the number of hosts will be used as value.
+ hosts -- the list of hosts to use to calculate the batch size.
+ """
+ validate_positive_integer('batch_size', batch_size)
+ hosts_size = len(hosts)
+
+ if batch_size is None:
+ batch_size = hosts_size
+ elif batch_size > hosts_size:
+ self.logger.debug(("Provided batch_size '{batch_size}' is greater
than the number of hosts '{hosts_size}'"
+ ", using '{hosts_size}' as
value").format(batch_size=batch_size, hosts_size=hosts_size))
+ batch_size = hosts_size
+
+ return batch_size
+
+ @staticmethod
+ def _compute_batch_sleep(batch_sleep):
+ """Validate batch_sleep and return its value or a default value.
+
+ Arguments:
+ batch_sleep -- a positive float indicating the sleep in seconds to
apply between one batched host and the next,
+ or None to get its default value.
+ """
+ validate_positive_float('batch_sleep', batch_sleep)
+ return batch_sleep or 0.0
+
+
class BaseWorker(object):
"""Worker interface to be extended by concrete workers."""
__metaclass__ = ABCMeta
- def __init__(self, config, logger=None):
+ def __init__(self, config, target, logger=None):
"""Worker constructor. Setup environment variables and initialize
properties.
Arguments:
config -- a dictionary with the parsed configuration file
+ target -- a Target instance
logger -- an optional logger instance [optional, default: None]
"""
self.config = config
+ self.target = target
self.logger = logger or logging.getLogger(__name__)
self.logger.trace('Transport {name} created with config:
{config}'.format(
name=type(self).__name__, config=config))
# Initialize setters values
- self._hosts = None
self._commands = None
self._handler = None
self._timeout = None
self._success_threshold = None
- self._batch_size = None
- self._batch_sleep = None
for key, value in config.get('environment', {}).iteritems():
os.environ[key] = value
@@ -266,23 +329,6 @@
@abstractmethod
def get_results(self):
"""Generator that yields tuples '(node_name, result)' with the results
of the current execution."""
-
- @property
- def hosts(self):
- """Getter for the hosts property with a default value."""
- return self._hosts or []
-
- @hosts.setter
- def hosts(self, value):
- """Setter for the hosts property with validation, raise WorkerError if
not valid.
-
- Arguments:
- value -- a ClusterShell.NodeSet.NodeSet instance or None to reset it.
- """
- if value is not None and not isinstance(value, NodeSet):
- raise_error('hosts', "must be an instance of ClusterShell's
NodeSet or None", value)
-
- self._hosts = value
@property
def commands(self):
@@ -370,43 +416,6 @@
value_type=type(value), value=value))
self._success_threshold = value
-
- @property
- def batch_size(self):
- """Getter for the batch_size property, default to the number of hosts
if not set."""
- return self._batch_size or len(self.hosts)
-
- @batch_size.setter
- def batch_size(self, value):
- """Setter for the batch_size property with validation, raise
WorkerError if not valid.
-
- Arguments:
- value -- the value to set the batch_size to, if greater than the
number of hosts it will be auto-resized to the
- number of hosts. Must be a positive integer or None to unset
it.
- """
- validate_positive_integer('batch_size', value)
- hosts_size = len(self.hosts)
- if value is not None and value > hosts_size:
- self.logger.debug(("Provided batch_size '{batch_size}' is greater
than the number of hosts '{hosts_size}'"
- ", using '{hosts_size}' as
value").format(batch_size=value, hosts_size=hosts_size))
- value = hosts_size
-
- self._batch_size = value
-
- @property
- def batch_sleep(self):
- """Getter for the batch_sleep property, default to 0.0 if not set."""
- return self._batch_sleep or 0.0
-
- @batch_sleep.setter
- def batch_sleep(self, value):
- """Setter for the batch_sleep property with validation, raise
WorkerError if value is not valid.
-
- Arguments:
- value -- the value to set the batch_sleep to. Must be a positive float
or None to unset it.
- """
- validate_positive_float('batch_sleep', value)
- self._batch_sleep = value
def validate_list(property_name, value, allow_empty=False):
diff --git a/cumin/transports/clustershell.py b/cumin/transports/clustershell.py
index 4151f7c..6cbd146 100644
--- a/cumin/transports/clustershell.py
+++ b/cumin/transports/clustershell.py
@@ -16,12 +16,12 @@
class ClusterShellWorker(BaseWorker):
"""It provides a Cumin worker for SSH using the ClusterShell library."""
- def __init__(self, config, logger=None):
+ def __init__(self, config, target, logger=None):
"""Worker ClusterShell constructor.
- Arguments: according to BaseQuery interface
+ Arguments: according to BaseWorker
"""
- super(ClusterShellWorker, self).__init__(config, logger)
+ super(ClusterShellWorker, self).__init__(config, target, logger)
self.task = Task.task_self() # Initialize a ClusterShell task
self._handler_instance = None
@@ -41,17 +41,14 @@
if self.handler is None:
raise RuntimeError('An EventHandler is mandatory.')
- # Schedule only the first command for the first batch, the following
ones must be handled by the EventHandler
- first_batch = self.hosts[:self.batch_size]
-
# Instantiate handler
+ # Schedule only the first command for the first batch, the following
ones must be handled by the EventHandler
self._handler_instance = self.handler( # pylint: disable=not-callable
- self.hosts, self.commands,
success_threshold=self.success_threshold, batch_size=self.batch_size,
- batch_sleep=self.batch_sleep, logger=self.logger,
first_batch=first_batch)
+ self.target, self.commands,
success_threshold=self.success_threshold, logger=self.logger)
self.logger.info("Executing commands {commands} on '{num}' hosts:
{hosts}".format(
- commands=self.commands, num=len(self.hosts), hosts=self.hosts))
- self.task.shell(self.commands[0].command, nodes=first_batch,
handler=self._handler_instance,
+ commands=self.commands, num=len(self.target.hosts),
hosts=self.target.hosts))
+ self.task.shell(self.commands[0].command,
nodes=self.target.first_batch, handler=self._handler_instance,
timeout=self.commands[0].timeout)
return_value = 0
@@ -124,18 +121,23 @@
short_command_length = 35 # For logging and printing the commands are
shortened to reach at most this length
- def __init__(self, nodes, commands, **kwargs):
+ def __init__(self, target, commands, success_threshold=1.0, logger=None,
**kwargs):
"""Event handler ClusterShell extension constructor.
If subclasses defines a self.pbar_ko tqdm progress bar, it will be
updated on timeout.
Arguments:
- nodes -- the ClusterShell's NodeSet with which this worker was
initiliazed.
- commands -- the list of Command objects that has to be executed on the
nodes.
- **kwargs -- optional additional keyword arguments that might be used
by classes that extend this base class.
+ target -- a Target instance.
+ commands -- the list of Command objects that has to be
executed on the nodes.
+ success_threshold -- the success threshold, a float between 0 and 1,
to consider the execution successful.
+ [optional, default: 1.0]
+ **kwargs -- additional keyword arguments that might be used
by classes that extend this base class.
+ [optional]
"""
super(BaseEventHandler, self).__init__()
- self.logger = kwargs.get('logger', None) or logging.getLogger(__name__)
+ self.success_threshold = success_threshold
+ self.logger = logger or logging.getLogger(__name__)
+ self.target = target
self.lock = threading.Lock() # Used to update instance variables
coherently from within callbacks
# Execution management variables
@@ -143,14 +145,14 @@
self.commands = commands
self.kwargs = kwargs # Allow to store custom parameters from
subclasses without changing the signature
self.counters = Counter()
- self.counters['total'] = len(nodes)
+ self.counters['total'] = len(target.hosts)
self.deduplicate_output = self.counters['total'] > 1
self.global_timedout = False
# Instantiate all the node instances, slicing the commands list to get
a copy
- self.nodes = {node: Node(node, commands[:]) for node in nodes}
+ self.nodes = {node: Node(node, commands[:]) for node in target.hosts}
# Move already all the nodes in the first_batch to the scheduled
state, it means that ClusterShell was
# already instructed to execute a command on those nodes
- for node_name in kwargs.get('first_batch', []):
+ for node_name in target.first_batch:
self.nodes[node_name].state.update(State.scheduled)
# Initialize color and progress bar formats
@@ -260,7 +262,7 @@
self.lock.release()
# Schedule a timer to run the current command on the next node or
start the next command
- worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+ worker.task.timer(self.target.batch_sleep, worker.eh)
def _get_log_message(self, num, message, nodes=None):
"""Helper to get a pre-formatted message suitable for logging or
printing.
@@ -387,10 +389,9 @@
num = self.counters['success']
tot = self.counters['total']
- success_threshold = self.kwargs.get('success_threshold', 1)
success_ratio = float(num) / tot
- if success_ratio < success_threshold:
+ if success_ratio < self.success_threshold:
comp = '<'
post = '. Aborting.'
else:
@@ -406,14 +407,14 @@
nodes = [node.name for node in self.nodes.itervalues() if
node.state.is_success]
message = "success ratio ({comp} {perc:.1%}
threshold){message_string}{post}".format(
- comp=comp, perc=success_threshold, message_string=message_string,
post=post)
+ comp=comp, perc=self.success_threshold,
message_string=message_string, post=post)
log_message, nodes_string = self._get_log_message(num, message,
nodes=nodes)
final_message = '{message}{nodes}'.format(message=log_message,
nodes=nodes_string)
if num == tot:
color = colorama.Fore.GREEN
self.logger.info(final_message)
- elif success_ratio >= success_threshold:
+ elif success_ratio >= self.success_threshold:
color = colorama.Fore.YELLOW
self.logger.warning(final_message)
else:
@@ -438,12 +439,13 @@
enough nodes before proceeding with the next one.
"""
- def __init__(self, nodes, commands, **kwargs):
+ def __init__(self, target, commands, success_threshold=1.0, logger=None,
**kwargs):
"""Custom ClusterShell synchronous event handler constructor.
Arguments: according to BaseEventHandler interface
"""
- super(SyncEventHandler, self).__init__(nodes, commands, **kwargs)
+ super(SyncEventHandler, self).__init__(
+ target, commands, success_threshold=success_threshold,
logger=logger, **kwargs)
self.current_command_index = 0 # Global pointer for the current
command in execution across all nodes
self.start_command()
self.aborted = False
@@ -468,13 +470,11 @@
# Schedule the next command, the first was already scheduled by
ClusterShellWorker.execute()
if schedule:
- batch_size = self.kwargs.get('batch_size', self.counters['total'])
-
self.lock.acquire() # Avoid modifications of the same data from
other callbacks triggered by ClusterShell
try:
# Available nodes for the next command execution were already
update back to the pending state
remaining_nodes = [node.name for node in
self.nodes.itervalues() if node.state.is_pending]
- first_batch = remaining_nodes[:batch_size]
+ first_batch = remaining_nodes[:self.target.batch_size]
first_batch_set = NodeSet.NodeSet.fromlist(first_batch)
for node_name in first_batch:
self.nodes[node_name].state.update(State.scheduled)
@@ -502,11 +502,10 @@
self._failed_commands_report(filter_command_index=self.current_command_index)
self._success_nodes_report(command=self.commands[self.current_command_index].command)
- success_threshold = self.kwargs.get('success_threshold', 1)
success_ratio = float(self.counters['success']) /
self.counters['total']
# Abort on failure
- if success_ratio < success_threshold:
+ if success_ratio < self.success_threshold:
self.return_value = 2
self.aborted = True # Tells other timers that might trigger after
that the abort is already in progress
return False
@@ -562,7 +561,7 @@
self.lock.release()
# Schedule a timer to run the current command on the next node or
start the next command
- worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+ worker.task.timer(self.target.batch_sleep, worker.eh)
def ev_timer(self, timer):
"""Schedule the current command on the next node or the next command
on the first batch of nodes.
@@ -571,11 +570,10 @@
Arguments: according to EventHandler interface
"""
- success_threshold = self.kwargs.get('success_threshold', 1)
success_ratio = 1 - (float(self.counters['failed'] +
self.counters['timeout']) / self.counters['total'])
node = None
- if success_ratio >= success_threshold:
+ if success_ratio >= self.success_threshold:
# Success ratio is still good, looking for the next node
self.lock.acquire() # Avoid modifications of the same data from
other callbacks triggered by ClusterShell
try:
@@ -659,12 +657,13 @@
orchestration between the nodes.
"""
- def __init__(self, nodes, commands, **kwargs):
+ def __init__(self, target, commands, success_threshold=1.0, logger=None,
**kwargs):
"""Custom ClusterShell asynchronous event handler constructor.
Arguments: according to BaseEventHandler interface
"""
- super(AsyncEventHandler, self).__init__(nodes, commands, **kwargs)
+ super(AsyncEventHandler, self).__init__(
+ target, commands, success_threshold=success_threshold,
logger=logger, **kwargs)
self.pbar_ok = tqdm(desc='PASS', total=self.counters['total'],
leave=True, unit='hosts', dynamic_ncols=True,
bar_format=colorama.Fore.GREEN + self.bar_format,
file=sys.stderr)
@@ -716,7 +715,7 @@
timeout=command.timeout)
elif schedule_timer:
# Schedule a timer to allow to run all the commands in the next
available node
- worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+ worker.task.timer(self.target.batch_sleep, worker.eh)
def ev_timer(self, timer):
"""Schedule the current command on the next node or the next command
on the first batch of nodes.
@@ -725,11 +724,10 @@
Arguments: according to EventHandler interface
"""
- success_threshold = self.kwargs.get('success_threshold', 1)
success_ratio = 1 - (float(self.counters['failed'] +
self.counters['timeout']) / self.counters['total'])
node = None
- if success_ratio >= success_threshold:
+ if success_ratio >= self.success_threshold:
# Success ratio is still good, looking for the next node
self.lock.acquire() # Avoid modifications of the same data from
other callbacks triggered by ClusterShell
try:
@@ -767,12 +765,11 @@
num = self.counters['success']
tot = self.counters['total']
- success_threshold = self.kwargs.get('success_threshold', 1)
success_ratio = float(num) / tot
if success_ratio == 1:
self.return_value = 0
- elif success_ratio < success_threshold:
+ elif success_ratio < self.success_threshold:
self.return_value = 2
else:
self.return_value = 1
--
To view, visit https://gerrit.wikimedia.org/r/367825
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ifd01a6f8ff3d6a5d7b02505599932a87e099dd90
Gerrit-PatchSet: 3
Gerrit-Project: operations/software/cumin
Gerrit-Branch: master
Gerrit-Owner: Volans <[email protected]>
Gerrit-Reviewer: Faidon Liambotis <[email protected]>
Gerrit-Reviewer: Gehel <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: Volans <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits