jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/367825 )

Change subject: Transports: improve target management
......................................................................


Transports: improve target management

Breaking change in the API.

* add a Target class to handle all the target-related configuration
* let the BaseWorker require an instance of the Target class and
  delegate to it for all the target-related configuration.
* This changes the BaseWorker constructor signature and removes the
  hosts, batch_size and batch_sleep setter/getter.

Bug: T171684
Change-Id: Ifd01a6f8ff3d6a5d7b02505599932a87e099dd90
---
M cumin/cli.py
M cumin/tests/unit/transports/test_clustershell.py
M cumin/tests/unit/transports/test_init.py
M cumin/tests/vulture_whitelist.py
M cumin/transports/__init__.py
M cumin/transports/clustershell.py
6 files changed, 220 insertions(+), 201 deletions(-)

Approvals:
  jenkins-bot: Verified
  Gehel: Looks good to me, but someone else must approve
  Volans: Looks good to me, approved



diff --git a/cumin/cli.py b/cumin/cli.py
index 9571c6c..51e1b5c 100644
--- a/cumin/cli.py
+++ b/cumin/cli.py
@@ -314,7 +314,7 @@
         return 0
 
     worker = transport.Transport.new(config, logger)
-    worker.hosts = hosts
+    worker.target = transports.Target(hosts, batch_size=args.batch_size, 
batch_sleep=args.batch_sleep, logger=logger)
 
     ok_codes = None
     if args.ignore_exit_codes:
@@ -325,8 +325,6 @@
     worker.timeout = args.global_timeout
     worker.handler = args.mode
     worker.success_threshold = args.success_percentage / float(100)
-    worker.batch_size = args.batch_size
-    worker.batch_sleep = args.batch_sleep
     exit_code = worker.execute()
 
     if args.interactive:
diff --git a/cumin/tests/unit/transports/test_clustershell.py 
b/cumin/tests/unit/transports/test_clustershell.py
index 12d63d8..647263d 100644
--- a/cumin/tests/unit/transports/test_clustershell.py
+++ b/cumin/tests/unit/transports/test_clustershell.py
@@ -4,7 +4,9 @@
 import mock
 import pytest
 
-from cumin.transports import BaseWorker, Command, clustershell, State, 
WorkerError
+from ClusterShell.NodeSet import NodeSet
+
+from cumin.transports import BaseWorker, Command, clustershell, State, Target, 
WorkerError
 
 
 def test_node_class_instantiation():
@@ -17,7 +19,7 @@
 @mock.patch('cumin.transports.clustershell.Task.task_self')
 def test_worker_class(task_self):
     """An instance of worker_class should be an instance of BaseWorker."""
-    worker = clustershell.worker_class({})
+    worker = clustershell.worker_class({}, Target(NodeSet('node1')))
     assert isinstance(worker, BaseWorker)
     task_self.assert_called_once_with()
 
@@ -33,8 +35,8 @@
                 'ssh_options': ['-o StrictHostKeyChecking=no', '-o 
BatchMode=yes'],
                 'fanout': 3}}
 
-        self.worker = clustershell.worker_class(self.config)
-        self.nodes = clustershell.NodeSet.NodeSet('node[1-2]')
+        self.target = Target(NodeSet('node[1-2]'))
+        self.worker = clustershell.worker_class(self.config, self.target)
         self.commands = [Command('command1'), Command('command2', ok_codes=[0, 
100], timeout=5)]
         self.task_self = task_self
         # Mock default handlers
@@ -43,13 +45,12 @@
             'async': mock.MagicMock(spec_set=clustershell.AsyncEventHandler)}
 
         # Initialize the worker
-        self.worker.hosts = self.nodes
         self.worker.commands = self.commands
 
     @mock.patch('cumin.transports.clustershell.Task.task_self')
     def test_instantiation(self, task_self):
         """An instance of ClusterShellWorker should be an instance of 
BaseWorker and initialize ClusterShell."""
-        worker = clustershell.ClusterShellWorker(self.config)
+        worker = clustershell.ClusterShellWorker(self.config, self.target)
         assert isinstance(worker, BaseWorker)
         task_self.assert_called_once_with()
         worker.task.set_info.assert_has_calls(
@@ -60,16 +61,20 @@
         """Calling execute() in sync mode without event handler should use the 
default sync event handler."""
         self.worker.handler = 'sync'
         self.worker.execute()
-        self.worker.task.shell.assert_called_once_with(
-            'command1', nodes=self.nodes, 
handler=self.worker._handler_instance, timeout=None)
+        args, kwargs = self.worker.task.shell.call_args
+        assert args == ('command1',)
+        assert kwargs['nodes'] == self.target.first_batch
+        assert kwargs['handler'] == self.worker._handler_instance
         assert clustershell.DEFAULT_HANDLERS['sync'].called
 
     def test_execute_default_async_handler(self):
         """Calling execute() in async mode without event handler should use 
the default async event handler."""
         self.worker.handler = 'async'
         self.worker.execute()
-        self.worker.task.shell.assert_called_once_with(
-            'command1', nodes=self.nodes, 
handler=self.worker._handler_instance, timeout=None)
+        args, kwargs = self.worker.task.shell.call_args
+        assert args == ('command1',)
+        assert kwargs['nodes'] == self.target.first_batch
+        assert kwargs['handler'] == self.worker._handler_instance
         assert clustershell.DEFAULT_HANDLERS['async'].called
 
     def test_execute_timeout(self):
@@ -84,8 +89,10 @@
         self.worker.handler = ConcreteBaseEventHandler
         self.worker.execute()
         assert isinstance(self.worker._handler_instance, 
ConcreteBaseEventHandler)
-        self.worker.task.shell.assert_called_once_with(
-            'command1', nodes=self.nodes, 
handler=self.worker._handler_instance, timeout=None)
+        args, kwargs = self.worker.task.shell.call_args
+        assert args == ('command1',)
+        assert kwargs['nodes'] == self.target.first_batch
+        assert kwargs['handler'] == self.worker._handler_instance
 
     def test_execute_no_commands(self):
         """Calling execute() without commands should raise WorkerError."""
@@ -110,8 +117,10 @@
         self.worker.handler = 'sync'
         self.worker.batch_size = 1
         self.worker.execute()
-        self.worker.task.shell.assert_called_once_with(
-            'command1', nodes=self.nodes[:1], 
handler=self.worker._handler_instance, timeout=None)
+        args, kwargs = self.worker.task.shell.call_args
+        assert args == ('command1',)
+        assert kwargs['nodes'] == self.target.first_batch
+        assert kwargs['handler'] == self.worker._handler_instance
 
     def test_get_results(self):
         """Calling get_results() should call ClusterShell iter_buffers with 
the right parameters."""
@@ -171,19 +180,19 @@
 
     def setup_method(self, *args):  # pylint: disable=arguments-differ
         """Initialize default properties and instances."""
-        self.nodes = clustershell.NodeSet.NodeSet('node[1-2]')
+        self.target = Target(NodeSet('node[1-2]'))
         self.commands = [Command('command1', ok_codes=[0, 100]), 
Command('command2', timeout=5)]
         self.worker = mock.MagicMock()
         self.worker.current_node = 'node1'
         self.worker.command = 'command1'
-        self.worker.nodes = self.nodes
+        self.worker.nodes = self.target.hosts
         self.handler = None
         self.args = args
 
     @mock.patch('cumin.transports.clustershell.colorama')
     def test_close(self, colorama):
         """Calling close should raise NotImplementedError."""
-        self.handler = clustershell.BaseEventHandler(self.nodes, self.commands)
+        self.handler = clustershell.BaseEventHandler(self.target, 
self.commands)
         with pytest.raises(NotImplementedError):
             self.handler.close(self.worker)
         colorama.init.assert_called_once_with()
@@ -210,25 +219,24 @@
     def setup_method(self, _, tqdm, colorama):  # pylint: 
disable=arguments-differ
         """Initialize default properties and instances."""
         super(TestConcreteBaseEventHandler, self).setup_method()
-        self.handler = ConcreteBaseEventHandler(
-            self.nodes, self.commands, batch_size=len(self.nodes), 
batch_sleep=0.0, first_batch=self.nodes)
+        self.handler = ConcreteBaseEventHandler(self.target, self.commands)
         self.worker.eh = self.handler
         self.colorama = colorama
         assert not tqdm.write.called
 
     def test_instantiation(self):
         """An instance of ConcreteBaseEventHandler should be an instance of 
BaseEventHandler and initialize colorama."""
-        assert sorted(self.handler.nodes.keys()) == list(self.nodes)
+        assert sorted(self.handler.nodes.keys()) == list(self.target.hosts)
         self.colorama.init.assert_called_once_with()
 
     @mock.patch('cumin.transports.clustershell.tqdm')
     def test_on_timeout(self, tqdm):
         """Calling on_timeout() should update the fail progress bar."""
-        for node in self.nodes:
+        for node in self.target.hosts:
             self.worker.current_node = node
             self.handler.ev_pickup(self.worker)
         self.worker.task.num_timeout.return_value = 1
-        self.worker.task.iter_keys_timeout.return_value = [self.nodes[0]]
+        self.worker.task.iter_keys_timeout.return_value = 
[self.target.hosts[0]]
 
         assert not self.handler.global_timedout
         self.handler.on_timeout(self.worker.task)
@@ -238,7 +246,7 @@
 
     def test_ev_pickup(self):
         """Calling ev_pickup() should set the state of the current node to 
running."""
-        for node in self.nodes:
+        for node in self.target.hosts:
             self.worker.current_node = node
             self.handler.ev_pickup(self.worker)
         running_nodes = [node for node in self.worker.eh.nodes.itervalues() if 
node.state.is_running]
@@ -247,7 +255,7 @@
     @mock.patch('cumin.transports.clustershell.tqdm')
     def test_ev_read_many_hosts(self, tqdm):
         """Calling ev_read() should not print the worker message if matching 
multiple hosts."""
-        for node in self.nodes:
+        for node in self.target.hosts:
             self.worker.current_node = node
             self.worker.current_msg = 'Node output'
             self.handler.ev_read(self.worker)
@@ -256,20 +264,19 @@
     @mock.patch('cumin.transports.clustershell.tqdm')
     def test_ev_read_single_host(self, tqdm):
         """Calling ev_read() should print the worker message if matching a 
single host."""
-        self.nodes = clustershell.NodeSet.NodeSet('node1')
-        self.handler = ConcreteBaseEventHandler(
-            self.nodes, self.commands, batch_size=len(self.nodes), 
batch_sleep=0.0, first_batch=self.nodes)
+        self.target = Target(NodeSet('node1'))
+        self.handler = ConcreteBaseEventHandler(self.target, self.commands)
 
         output = 'node1 output'
-        self.worker.nodes = self.nodes
-        self.worker.current_node = self.nodes[0]
+        self.worker.nodes = self.target.hosts
+        self.worker.current_node = self.target.hosts[0]
         self.worker.current_msg = output
         self.handler.ev_read(self.worker)
         tqdm.write.assert_has_calls([mock.call(output)])
 
     def test_ev_timeout(self):
         """Calling ev_timeout() should increase the counters for the timed out 
hosts."""
-        for node in self.nodes:
+        for node in self.target.hosts:
             self.worker.current_node = node
             self.handler.ev_pickup(self.worker)
 
@@ -288,9 +295,7 @@
     def setup_method(self, _, tqdm, colorama, logger):  # pylint: 
disable=arguments-differ
         """Initialize default properties and instances."""
         super(TestSyncEventHandler, self).setup_method()
-        self.handler = clustershell.SyncEventHandler(
-            self.nodes, self.commands, success_threshold=1, 
batch_size=len(self.nodes), batch_sleep=0, logger=None,
-            first_batch=self.nodes)
+        self.handler = clustershell.SyncEventHandler(self.target, 
self.commands, success_threshold=1)
         self.worker.eh = self.handler
         self.colorama = colorama
         self.logger = logger
@@ -330,7 +335,7 @@
         assert not self.handler.end_command()
         self.handler.counters['success'] = 2
         assert self.handler.end_command()
-        self.handler.kwargs['success_threshold'] = 0.5
+        self.handler.success_threshold = 0.5
         self.handler.counters['success'] = 1
         assert self.handler.end_command()
         self.handler.current_command_index = 1
@@ -387,7 +392,7 @@
     def setup_method(self, _, tqdm, colorama, logger):  # pylint: 
disable=arguments-differ
         """Initialize default properties and instances."""
         super(TestAsyncEventHandler, self).setup_method()
-        self.handler = clustershell.AsyncEventHandler(self.nodes, 
self.commands)
+        self.handler = clustershell.AsyncEventHandler(self.target, 
self.commands)
         self.worker.eh = self.handler
         self.colorama = colorama
         self.logger = logger
@@ -400,8 +405,6 @@
 
     def test_ev_hup_ok(self):
         """Calling ev_hup with a worker that has zero exit status should 
enqueue the next command."""
-        for node in self.handler.nodes.itervalues():
-            node.state.update(State.scheduled)
         self.handler.ev_pickup(self.worker)
         self.worker.current_rc = 0
         self.handler.ev_hup(self.worker)
@@ -418,8 +421,6 @@
 
     def test_ev_hup_ko(self):
         """Calling ev_hup with a worker that has non-zero exit status should 
not enqueue the next command."""
-        for node in self.handler.nodes.itervalues():
-            node.state.update(State.scheduled)
         self.handler.ev_pickup(self.worker)
         self.worker.current_rc = 1
         self.handler.ev_hup(self.worker)
diff --git a/cumin/tests/unit/transports/test_init.py 
b/cumin/tests/unit/transports/test_init.py
index 6e78929..8f75535 100644
--- a/cumin/tests/unit/transports/test_init.py
+++ b/cumin/tests/unit/transports/test_init.py
@@ -307,15 +307,83 @@
         assert state.current == transports.State.pending
 
 
+class TestTarget(object):
+    """Target class tests."""
+
+    def setup_method(self, _):
+        """Initialize default properties and instances."""
+        # pylint: disable=attribute-defined-outside-init
+        self.hosts_list = ['host' + str(i) for i in xrange(10)]
+        self.hosts = NodeSet.fromlist(self.hosts_list)
+
+    def test_instantiation_nodeset(self):
+        """Creating a Target instance with a NodeSet and without optional 
parameter should return their defaults."""
+        target = transports.Target(self.hosts)
+        assert target.hosts == self.hosts
+        assert target.batch_size == len(self.hosts)
+        assert target.batch_sleep == 0.0
+
+    def test_instantiation_list(self):
+        """Creating a Target instance with a list and without optional 
parameter should return their defaults."""
+        target = transports.Target(self.hosts_list)
+        assert target.hosts == self.hosts
+        assert target.batch_size == len(self.hosts)
+        assert target.batch_sleep == 0.0
+
+    def test_instantiation_invalid(self):
+        """Creating a Target instance with invalid hosts should raise 
WorkerError."""
+        with pytest.raises(transports.WorkerError, match="must be a 
ClusterShell's NodeSet or a list"):
+            transports.Target(set(self.hosts_list))
+
+    @mock.patch('cumin.transports.logging.Logger.debug')
+    def test_instantiation_batch_size(self, mocked_logger):
+        """Creating a Target instance with a batch_size should set it to it's 
value, if valid."""
+        target = transports.Target(self.hosts, batch_size=5)
+        assert target.batch_size == 5
+
+        target = transports.Target(self.hosts, batch_size=len(self.hosts) + 1)
+        assert target.batch_size == len(self.hosts)
+        assert mocked_logger.called
+
+        target = transports.Target(self.hosts, batch_size=None)
+        assert target.batch_size == len(self.hosts)
+
+        with pytest.raises(transports.WorkerError):
+            transports.Target(self.hosts, batch_size=0)
+
+    def test_instantiation_batch_sleep(self):
+        """Creating a Target instance with a batch_sleep should set it to it's 
value, if valid."""
+        target = transports.Target(self.hosts, batch_sleep=5.0)
+        assert target.batch_sleep == pytest.approx(5.0)
+
+        target = transports.Target(self.hosts, batch_sleep=None)
+        assert target.batch_sleep == pytest.approx(0.0)
+
+        with pytest.raises(transports.WorkerError):
+            transports.Target(self.hosts, batch_sleep=0)
+
+        with pytest.raises(transports.WorkerError):
+            transports.Target(self.hosts, batch_sleep=-1.0)
+
+    def test_first_batch(self):
+        """The first_batch property should return the first_batch of hosts."""
+        size = 5
+        target = transports.Target(self.hosts, batch_size=size)
+        assert len(target.first_batch) == size
+        assert target.first_batch == NodeSet.fromlist(self.hosts[:size])
+        assert isinstance(target.first_batch, NodeSet)
+
+
 class TestBaseWorker(object):
     """Concrete BaseWorker class for tests."""
 
     def test_instantiation(self):
         """Raise if instantiated directly, should return an instance of 
BaseWorker if inherited."""
+        target = transports.Target(NodeSet('node1'))
         with pytest.raises(TypeError):
-            transports.BaseWorker({})  # pylint: 
disable=abstract-class-instantiated
+            transports.BaseWorker({}, target)  # pylint: 
disable=abstract-class-instantiated
 
-        assert isinstance(ConcreteBaseWorker({}), transports.BaseWorker)
+        assert isinstance(ConcreteBaseWorker({}, 
transports.Target(NodeSet('node[1-2]'))), transports.BaseWorker)
 
     @mock.patch.dict(transports.os.environ, {}, clear=True)
     def test_init(self):
@@ -325,7 +393,7 @@
                   'environment': env_dict}
 
         assert transports.os.environ == {}
-        worker = ConcreteBaseWorker(config)
+        worker = ConcreteBaseWorker(config, 
transports.Target(NodeSet('node[1-2]')))
         assert transports.os.environ == env_dict
         assert worker.config == config
 
@@ -336,23 +404,8 @@
     def setup_method(self, _):
         """Initialize default properties and instances."""
         # pylint: disable=attribute-defined-outside-init
-        self.worker = ConcreteBaseWorker({})
-        self.hosts = NodeSet('node[1-2]')
+        self.worker = ConcreteBaseWorker({}, 
transports.Target(NodeSet('node[1-2]')))
         self.commands = [transports.Command('command1'), 
transports.Command('command2')]
-
-    def test_hosts_getter(self):
-        """Access to hosts property should return an empty list if not set and 
the list of hosts otherwise."""
-        assert self.worker.hosts == []
-        self.worker._hosts = self.hosts
-        assert self.worker.hosts == self.hosts
-
-    def test_hosts_setter(self):
-        """Raise WorkerError if trying to set it not to an iterable, set it 
otherwise."""
-        with pytest.raises(transports.WorkerError, match="must be an instance 
of ClusterShell's NodeSet or None"):
-            self.worker.hosts = 'not-list'
-
-        self.worker.hosts = self.hosts
-        assert self.worker._hosts == self.hosts
 
     def test_commands_getter(self):
         """Access to commands property should return an empty list if not set 
and the list of commands otherwise."""
@@ -415,48 +468,6 @@
 
         self.worker.success_threshold = 0.3
         assert self.worker._success_threshold == pytest.approx(0.3)
-
-    def test_batch_size_getter(self):
-        """Return default value if not set, the value otherwise."""
-        self.worker.hosts = self.hosts
-        assert self.worker.batch_size == len(self.hosts)
-        self.worker._batch_size = 1
-        assert self.worker.batch_size == 1
-
-    def test_batch_size_setter(self):
-        """Raise WorkerError if not positive integer, set it otherwise forcing 
it to len(hosts) if greater."""
-        with pytest.raises(transports.WorkerError, match='batch_size must be a 
positive integer'):
-            self.worker.batch_size = -1
-
-        with pytest.raises(transports.WorkerError, match='batch_size must be a 
positive integer'):
-            self.worker.batch_size = 0
-
-        self.worker.hosts = self.hosts
-        self.worker.batch_size = 10
-        assert self.worker._batch_size == len(self.hosts)
-        self.worker.batch_size = 1
-        assert self.worker._batch_size == 1
-
-    def test_batch_sleep_getter(self):
-        """Return default value if not set, the value otherwise."""
-        assert self.worker.batch_sleep == pytest.approx(0.0)
-        self.worker._batch_sleep = 10.0
-        assert self.worker.batch_sleep == pytest.approx(10.0)
-
-    def test_batch_sleep_setter(self):
-        """Raise WorkerError if not positive integer, set it otherwise."""
-        message = r'batch_sleep must be a positive float'
-        with pytest.raises(transports.WorkerError, match=message):
-            self.worker.batch_sleep = 1
-
-        with pytest.raises(transports.WorkerError, match=message):
-            self.worker.batch_sleep = '1'
-
-        with pytest.raises(transports.WorkerError, match=message):
-            self.worker.batch_sleep = -1.0
-
-        self.worker.batch_sleep = 10.0
-        assert self.worker._batch_sleep == pytest.approx(10.0)
 
 
 class TestModuleFunctions(object):
diff --git a/cumin/tests/vulture_whitelist.py b/cumin/tests/vulture_whitelist.py
index 255aca2..5613903 100644
--- a/cumin/tests/vulture_whitelist.py
+++ b/cumin/tests/vulture_whitelist.py
@@ -15,6 +15,9 @@
 whitelist_cli = Whitelist()
 whitelist_cli.run.h
 
+whitelist_transports_clustershell = Whitelist()
+whitelist_transports_clustershell.BaseEventHandler.kwargs
+
 whitelist_tests_integration_conftest = Whitelist()
 whitelist_tests_integration_conftest.pytest_cmdline_preparse
 whitelist_tests_integration_conftest.pytest_runtest_makereport
diff --git a/cumin/transports/__init__.py b/cumin/transports/__init__.py
index b664492..50d3a15 100644
--- a/cumin/transports/__init__.py
+++ b/cumin/transports/__init__.py
@@ -230,31 +230,94 @@
         self._state = new
 
 
+class Target(object):
+    """Targets management class."""
+
+    def __init__(self, hosts, batch_size=None, batch_sleep=None, logger=None):
+        """Constructor, inizialize the Target with the list of hosts and 
additional parameters.
+
+        Arguments:
+        hosts       -- a ClusterShell's NodeSet or a list of hosts that will 
be targeted
+        batch_size  -- set the batch size so that no more that this number of 
hosts are targeted at any given time.
+                       If greater than the number of hosts it will be 
auto-resized to the number of hosts. It must be
+                       a positive integer or None to unset it. [optional, 
default: None]
+        batch_sleep -- sleep time in seconds between the end of execution of 
one host in the batch and the start in
+                       the next host. It must be a positive float or None to 
unset it. [optional, default: None]
+        logger      -- a logging.Logger instance [optional, default: None]
+        """
+        self.logger = logger or logging.getLogger(__name__)
+
+        if isinstance(hosts, NodeSet):
+            self.hosts = hosts
+        elif isinstance(hosts, list):
+            self.hosts = NodeSet.fromlist(hosts)
+        else:
+            raise_error('hosts', "must be a ClusterShell's NodeSet or a list", 
hosts)
+
+        self.batch_size = self._compute_batch_size(batch_size, self.hosts)
+        self.batch_sleep = Target._compute_batch_sleep(batch_sleep)
+
+    @property
+    def first_batch(self):
+        """Extract the first batch of hosts to execute."""
+        return self.hosts[:self.batch_size]
+
+    def _compute_batch_size(self, batch_size, hosts):
+        """Compute the batch_size based on the hosts size and return the value 
to be used.
+
+        Arguments:
+        batch_size -- a positive integer to indicate the batch_size to apply 
when executing the worker or None to get
+                      its default value. If greater than the number of hosts, 
the number of hosts will be used as value.
+        hosts      -- the list of hosts to use to calculate the batch size.
+        """
+        validate_positive_integer('batch_size', batch_size)
+        hosts_size = len(hosts)
+
+        if batch_size is None:
+            batch_size = hosts_size
+        elif batch_size > hosts_size:
+            self.logger.debug(("Provided batch_size '{batch_size}' is greater 
than the number of hosts '{hosts_size}'"
+                               ", using '{hosts_size}' as 
value").format(batch_size=batch_size, hosts_size=hosts_size))
+            batch_size = hosts_size
+
+        return batch_size
+
+    @staticmethod
+    def _compute_batch_sleep(batch_sleep):
+        """Validate batch_sleep and return its value or a default value.
+
+        Arguments:
+        batch_sleep -- a positive float indicating the sleep in seconds to 
apply between one batched host and the next,
+                       or None to get its default value.
+        """
+        validate_positive_float('batch_sleep', batch_sleep)
+        return batch_sleep or 0.0
+
+
 class BaseWorker(object):
     """Worker interface to be extended by concrete workers."""
 
     __metaclass__ = ABCMeta
 
-    def __init__(self, config, logger=None):
+    def __init__(self, config, target, logger=None):
         """Worker constructor. Setup environment variables and initialize 
properties.
 
         Arguments:
         config -- a dictionary with the parsed configuration file
+        target -- a Target instance
         logger -- an optional logger instance [optional, default: None]
         """
         self.config = config
+        self.target = target
         self.logger = logger or logging.getLogger(__name__)
         self.logger.trace('Transport {name} created with config: 
{config}'.format(
             name=type(self).__name__, config=config))
 
         # Initialize setters values
-        self._hosts = None
         self._commands = None
         self._handler = None
         self._timeout = None
         self._success_threshold = None
-        self._batch_size = None
-        self._batch_sleep = None
 
         for key, value in config.get('environment', {}).iteritems():
             os.environ[key] = value
@@ -266,23 +329,6 @@
     @abstractmethod
     def get_results(self):
         """Generator that yields tuples '(node_name, result)' with the results 
of the current execution."""
-
-    @property
-    def hosts(self):
-        """Getter for the hosts property with a default value."""
-        return self._hosts or []
-
-    @hosts.setter
-    def hosts(self, value):
-        """Setter for the hosts property with validation, raise WorkerError if 
not valid.
-
-        Arguments:
-        value -- a ClusterShell.NodeSet.NodeSet instance or None to reset it.
-        """
-        if value is not None and not isinstance(value, NodeSet):
-            raise_error('hosts', "must be an instance of ClusterShell's 
NodeSet or None", value)
-
-        self._hosts = value
 
     @property
     def commands(self):
@@ -370,43 +416,6 @@
                 value_type=type(value), value=value))
 
         self._success_threshold = value
-
-    @property
-    def batch_size(self):
-        """Getter for the batch_size property, default to the number of hosts 
if not set."""
-        return self._batch_size or len(self.hosts)
-
-    @batch_size.setter
-    def batch_size(self, value):
-        """Setter for the batch_size property with validation, raise 
WorkerError if not valid.
-
-        Arguments:
-        value -- the value to set the batch_size to, if greater than the 
number of hosts it will be auto-resized to the
-                 number of hosts. Must be a positive integer or None to unset 
it.
-        """
-        validate_positive_integer('batch_size', value)
-        hosts_size = len(self.hosts)
-        if value is not None and value > hosts_size:
-            self.logger.debug(("Provided batch_size '{batch_size}' is greater 
than the number of hosts '{hosts_size}'"
-                               ", using '{hosts_size}' as 
value").format(batch_size=value, hosts_size=hosts_size))
-            value = hosts_size
-
-        self._batch_size = value
-
-    @property
-    def batch_sleep(self):
-        """Getter for the batch_sleep property, default to 0.0 if not set."""
-        return self._batch_sleep or 0.0
-
-    @batch_sleep.setter
-    def batch_sleep(self, value):
-        """Setter for the batch_sleep property with validation, raise 
WorkerError if value is not valid.
-
-        Arguments:
-        value -- the value to set the batch_sleep to. Must be a positive float 
or None to unset it.
-        """
-        validate_positive_float('batch_sleep', value)
-        self._batch_sleep = value
 
 
 def validate_list(property_name, value, allow_empty=False):
diff --git a/cumin/transports/clustershell.py b/cumin/transports/clustershell.py
index 4151f7c..6cbd146 100644
--- a/cumin/transports/clustershell.py
+++ b/cumin/transports/clustershell.py
@@ -16,12 +16,12 @@
 class ClusterShellWorker(BaseWorker):
     """It provides a Cumin worker for SSH using the ClusterShell library."""
 
-    def __init__(self, config, logger=None):
+    def __init__(self, config, target, logger=None):
         """Worker ClusterShell constructor.
 
-        Arguments: according to BaseQuery interface
+        Arguments: according to BaseWorker
         """
-        super(ClusterShellWorker, self).__init__(config, logger)
+        super(ClusterShellWorker, self).__init__(config, target, logger)
         self.task = Task.task_self()  # Initialize a ClusterShell task
         self._handler_instance = None
 
@@ -41,17 +41,14 @@
         if self.handler is None:
             raise RuntimeError('An EventHandler is mandatory.')
 
-        # Schedule only the first command for the first batch, the following 
ones must be handled by the EventHandler
-        first_batch = self.hosts[:self.batch_size]
-
         # Instantiate handler
+        # Schedule only the first command for the first batch, the following 
ones must be handled by the EventHandler
         self._handler_instance = self.handler(  # pylint: disable=not-callable
-            self.hosts, self.commands, 
success_threshold=self.success_threshold, batch_size=self.batch_size,
-            batch_sleep=self.batch_sleep, logger=self.logger, 
first_batch=first_batch)
+            self.target, self.commands, 
success_threshold=self.success_threshold, logger=self.logger)
 
         self.logger.info("Executing commands {commands} on '{num}' hosts: 
{hosts}".format(
-            commands=self.commands, num=len(self.hosts), hosts=self.hosts))
-        self.task.shell(self.commands[0].command, nodes=first_batch, 
handler=self._handler_instance,
+            commands=self.commands, num=len(self.target.hosts), 
hosts=self.target.hosts))
+        self.task.shell(self.commands[0].command, 
nodes=self.target.first_batch, handler=self._handler_instance,
                         timeout=self.commands[0].timeout)
 
         return_value = 0
@@ -124,18 +121,23 @@
 
     short_command_length = 35  # For logging and printing the commands are 
shortened to reach at most this length
 
-    def __init__(self, nodes, commands, **kwargs):
+    def __init__(self, target, commands, success_threshold=1.0, logger=None, 
**kwargs):
         """Event handler ClusterShell extension constructor.
 
         If subclasses defines a self.pbar_ko tqdm progress bar, it will be 
updated on timeout.
 
         Arguments:
-        nodes    -- the ClusterShell's NodeSet with which this worker was 
initiliazed.
-        commands -- the list of Command objects that has to be executed on the 
nodes.
-        **kwargs -- optional additional keyword arguments that might be used 
by classes that extend this base class.
+        target            -- a Target instance.
+        commands          -- the list of Command objects that has to be 
executed on the nodes.
+        success_threshold -- the success threshold, a float between 0 and 1, 
to consider the execution successful.
+                             [optional, default: 1.0]
+        **kwargs          -- additional keyword arguments that might be used 
by classes that extend this base class.
+                             [optional]
         """
         super(BaseEventHandler, self).__init__()
-        self.logger = kwargs.get('logger', None) or logging.getLogger(__name__)
+        self.success_threshold = success_threshold
+        self.logger = logger or logging.getLogger(__name__)
+        self.target = target
         self.lock = threading.Lock()  # Used to update instance variables 
coherently from within callbacks
 
         # Execution management variables
@@ -143,14 +145,14 @@
         self.commands = commands
         self.kwargs = kwargs  # Allow to store custom parameters from 
subclasses without changing the signature
         self.counters = Counter()
-        self.counters['total'] = len(nodes)
+        self.counters['total'] = len(target.hosts)
         self.deduplicate_output = self.counters['total'] > 1
         self.global_timedout = False
         # Instantiate all the node instances, slicing the commands list to get 
a copy
-        self.nodes = {node: Node(node, commands[:]) for node in nodes}
+        self.nodes = {node: Node(node, commands[:]) for node in target.hosts}
         # Move already all the nodes in the first_batch to the scheduled 
state, it means that ClusterShell was
         # already instructed to execute a command on those nodes
-        for node_name in kwargs.get('first_batch', []):
+        for node_name in target.first_batch:
             self.nodes[node_name].state.update(State.scheduled)
 
         # Initialize color and progress bar formats
@@ -260,7 +262,7 @@
             self.lock.release()
 
         # Schedule a timer to run the current command on the next node or 
start the next command
-        worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+        worker.task.timer(self.target.batch_sleep, worker.eh)
 
     def _get_log_message(self, num, message, nodes=None):
         """Helper to get a pre-formatted message suitable for logging or 
printing.
@@ -387,10 +389,9 @@
             num = self.counters['success']
 
         tot = self.counters['total']
-        success_threshold = self.kwargs.get('success_threshold', 1)
         success_ratio = float(num) / tot
 
-        if success_ratio < success_threshold:
+        if success_ratio < self.success_threshold:
             comp = '<'
             post = '. Aborting.'
         else:
@@ -406,14 +407,14 @@
             nodes = [node.name for node in self.nodes.itervalues() if 
node.state.is_success]
 
         message = "success ratio ({comp} {perc:.1%} 
threshold){message_string}{post}".format(
-            comp=comp, perc=success_threshold, message_string=message_string, 
post=post)
+            comp=comp, perc=self.success_threshold, 
message_string=message_string, post=post)
         log_message, nodes_string = self._get_log_message(num, message, 
nodes=nodes)
         final_message = '{message}{nodes}'.format(message=log_message, 
nodes=nodes_string)
 
         if num == tot:
             color = colorama.Fore.GREEN
             self.logger.info(final_message)
-        elif success_ratio >= success_threshold:
+        elif success_ratio >= self.success_threshold:
             color = colorama.Fore.YELLOW
             self.logger.warning(final_message)
         else:
@@ -438,12 +439,13 @@
     enough nodes before proceeding with the next one.
     """
 
-    def __init__(self, nodes, commands, **kwargs):
+    def __init__(self, target, commands, success_threshold=1.0, logger=None, 
**kwargs):
         """Custom ClusterShell synchronous event handler constructor.
 
         Arguments: according to BaseEventHandler interface
         """
-        super(SyncEventHandler, self).__init__(nodes, commands, **kwargs)
+        super(SyncEventHandler, self).__init__(
+            target, commands, success_threshold=success_threshold, 
logger=logger, **kwargs)
         self.current_command_index = 0  # Global pointer for the current 
command in execution across all nodes
         self.start_command()
         self.aborted = False
@@ -468,13 +470,11 @@
 
         # Schedule the next command, the first was already scheduled by 
ClusterShellWorker.execute()
         if schedule:
-            batch_size = self.kwargs.get('batch_size', self.counters['total'])
-
             self.lock.acquire()  # Avoid modifications of the same data from 
other callbacks triggered by ClusterShell
             try:
                 # Available nodes for the next command execution were already 
update back to the pending state
                 remaining_nodes = [node.name for node in 
self.nodes.itervalues() if node.state.is_pending]
-                first_batch = remaining_nodes[:batch_size]
+                first_batch = remaining_nodes[:self.target.batch_size]
                 first_batch_set = NodeSet.NodeSet.fromlist(first_batch)
                 for node_name in first_batch:
                     self.nodes[node_name].state.update(State.scheduled)
@@ -502,11 +502,10 @@
         
self._failed_commands_report(filter_command_index=self.current_command_index)
         
self._success_nodes_report(command=self.commands[self.current_command_index].command)
 
-        success_threshold = self.kwargs.get('success_threshold', 1)
         success_ratio = float(self.counters['success']) / 
self.counters['total']
 
         # Abort on failure
-        if success_ratio < success_threshold:
+        if success_ratio < self.success_threshold:
             self.return_value = 2
             self.aborted = True  # Tells other timers that might trigger after 
that the abort is already in progress
             return False
@@ -562,7 +561,7 @@
             self.lock.release()
 
         # Schedule a timer to run the current command on the next node or 
start the next command
-        worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+        worker.task.timer(self.target.batch_sleep, worker.eh)
 
     def ev_timer(self, timer):
         """Schedule the current command on the next node or the next command 
on the first batch of nodes.
@@ -571,11 +570,10 @@
 
         Arguments: according to EventHandler interface
         """
-        success_threshold = self.kwargs.get('success_threshold', 1)
         success_ratio = 1 - (float(self.counters['failed'] + 
self.counters['timeout']) / self.counters['total'])
 
         node = None
-        if success_ratio >= success_threshold:
+        if success_ratio >= self.success_threshold:
             # Success ratio is still good, looking for the next node
             self.lock.acquire()  # Avoid modifications of the same data from 
other callbacks triggered by ClusterShell
             try:
@@ -659,12 +657,13 @@
     orchestration between the nodes.
     """
 
-    def __init__(self, nodes, commands, **kwargs):
+    def __init__(self, target, commands, success_threshold=1.0, logger=None, 
**kwargs):
         """Custom ClusterShell asynchronous event handler constructor.
 
         Arguments: according to BaseEventHandler interface
         """
-        super(AsyncEventHandler, self).__init__(nodes, commands, **kwargs)
+        super(AsyncEventHandler, self).__init__(
+            target, commands, success_threshold=success_threshold, 
logger=logger, **kwargs)
 
         self.pbar_ok = tqdm(desc='PASS', total=self.counters['total'], 
leave=True, unit='hosts', dynamic_ncols=True,
                             bar_format=colorama.Fore.GREEN + self.bar_format, 
file=sys.stderr)
@@ -716,7 +715,7 @@
                               timeout=command.timeout)
         elif schedule_timer:
             # Schedule a timer to allow to run all the commands in the next 
available node
-            worker.task.timer(self.kwargs.get('batch_sleep', 0.0), worker.eh)
+            worker.task.timer(self.target.batch_sleep, worker.eh)
 
     def ev_timer(self, timer):
         """Schedule the current command on the next node or the next command 
on the first batch of nodes.
@@ -725,11 +724,10 @@
 
         Arguments: according to EventHandler interface
         """
-        success_threshold = self.kwargs.get('success_threshold', 1)
         success_ratio = 1 - (float(self.counters['failed'] + 
self.counters['timeout']) / self.counters['total'])
 
         node = None
-        if success_ratio >= success_threshold:
+        if success_ratio >= self.success_threshold:
             # Success ratio is still good, looking for the next node
             self.lock.acquire()  # Avoid modifications of the same data from 
other callbacks triggered by ClusterShell
             try:
@@ -767,12 +765,11 @@
 
         num = self.counters['success']
         tot = self.counters['total']
-        success_threshold = self.kwargs.get('success_threshold', 1)
         success_ratio = float(num) / tot
 
         if success_ratio == 1:
             self.return_value = 0
-        elif success_ratio < success_threshold:
+        elif success_ratio < self.success_threshold:
             self.return_value = 2
         else:
             self.return_value = 1

-- 
To view, visit https://gerrit.wikimedia.org/r/367825
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ifd01a6f8ff3d6a5d7b02505599932a87e099dd90
Gerrit-PatchSet: 3
Gerrit-Project: operations/software/cumin
Gerrit-Branch: master
Gerrit-Owner: Volans <[email protected]>
Gerrit-Reviewer: Faidon Liambotis <[email protected]>
Gerrit-Reviewer: Gehel <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: Volans <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to