Repository: accumulo Updated Branches: refs/heads/master 39f14472b -> 43f4464b3
ACCUMULO-4034 re-write the agitator to include zookeeper stop/start Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/43f4464b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/43f4464b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/43f4464b Branch: refs/heads/master Commit: 43f4464b38d9c380ed882069da0e76579b0db7a4 Parents: 39f1447 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Oct 20 17:03:05 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Oct 20 17:03:05 2015 -0400 ---------------------------------------------------------------------- test/system/agitator/.gitignore | 3 + test/system/agitator/README.md | 39 +++++ test/system/agitator/agitator.ini.example | 48 +++++ test/system/agitator/agitator.py | 234 +++++++++++++++++++++++++ test/system/agitator/hosts.example | 16 ++ 5 files changed, 340 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f4464b/test/system/agitator/.gitignore ---------------------------------------------------------------------- diff --git a/test/system/agitator/.gitignore b/test/system/agitator/.gitignore new file mode 100644 index 0000000..3429b01 --- /dev/null +++ b/test/system/agitator/.gitignore @@ -0,0 +1,3 @@ +*~ +*.ini +*.pyc http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f4464b/test/system/agitator/README.md ---------------------------------------------------------------------- diff --git a/test/system/agitator/README.md b/test/system/agitator/README.md new file mode 100644 index 0000000..fdff65b --- /dev/null +++ b/test/system/agitator/README.md @@ -0,0 +1,39 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +Agitator: randomly kill processes +=========================== + +The agitator is used to randomly select processes for termination during +system test. + +Configure the agitator using the example agitator.ini file provided. + +Create a list of hosts to be agitated: + + $ cp ../../../conf/slaves hosts + $ echo master >> hosts + $ echo namenode >> hosts + +The agitator can be used to kill and restart any part of the accumulo +ecosystem: zookeepers, namenode, datanodes, tablet servers and master. +You can choose to agitate them all with "--all" + + $ ./agitator.py --all --hosts=hosts --config=agitator.ini --log DEBUG + +You will need to be able to ssh, without passwords, to all your hosts as +the user that can kill and start the services. http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f4464b/test/system/agitator/agitator.ini.example ---------------------------------------------------------------------- diff --git a/test/system/agitator/agitator.ini.example b/test/system/agitator/agitator.ini.example new file mode 100644 index 0000000..825f0ed --- /dev/null +++ b/test/system/agitator/agitator.ini.example @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[DEFAULT] +install=%(env.pwd)s/../../../.. +user=%(env.user)s + +[agitator] +kill=kill -9 +ssh=ssh -q -t -A -o StrictHostKeyChecking=no +sleep=300 +sleep.restart=30 +sleep.jitter=30 + +[accumulo] +home=%(install)s/accumulo +tserver.kill.min=1 +tserver.kill.max=1 +tserver.frequency=0.8 +master.kill.min=1 +master.kill.max=1 +master.frequency=0.1 + +[hadoop] +home=%(install)s/hadoop +bin=%(home)s/bin +datanode.frequency=0.8 +datanode.kill.min=1 +datanode.kill.max=1 +namenode.frequency=0.05 +namenode.kill.min=1 +namenode.kill.max=1 + +[zookeeper] +home=%(install)s/zookeeper +frequency=0.05 http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f4464b/test/system/agitator/agitator.py ---------------------------------------------------------------------- diff --git a/test/system/agitator/agitator.py b/test/system/agitator/agitator.py new file mode 100755 index 0000000..92b0b28 --- /dev/null +++ b/test/system/agitator/agitator.py @@ -0,0 +1,234 @@ +#! /usr/bin/python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import logging +import ConfigParser + +# add the environment variables as default settings +import os +defaults=dict([('env.' + k, v) for k, v in os.environ.iteritems()]) +config = ConfigParser.ConfigParser(defaults) + +# things you can do to a particular kind of process +class Proc: + program = 'Unknown' + _frequencyToKill = 1.0 + + def start(self, host): + pass + + def find(self, host): + pass + + def numberToKill(self): + return (1, 1) + + def frequencyToKill(self): + return self._frequencyToKill + + def user(self): + return config.get(self.program, 'user') + + def kill(self, host, pid): + kill = config.get('agitator', 'kill').split() + code, stdout, stderr = self.runOn(host, kill + [pid]) + if code != 0: + raise logging.warn("Unable to kill %d on %s (%s)", pid, host, stderr) + + def runOn(self, host, cmd): + ssh = config.get('agitator', 'ssh').split() + return self.run(ssh + ["%s@%s" % (self.user(), host)] + cmd) + + def run(self, cmd): + import subprocess + cmd = map(str, cmd) + logging.debug('Running %s', cmd) + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if stdout.strip(): + logging.debug("%s", stdout.strip()) + if stderr.strip(): + logging.error("%s", stderr.strip()) + if p.returncode != 0: + logging.error("Problem running %s", ' '.join(cmd)) + return p.returncode, stdout, stderr + + def __repr__(self): + return self.program + +class Zookeeper(Proc): + program = 'zookeeper' + def __init__(self): + _frequencyToKill = config.get(self.program, 'frequency') + + def start(self, host): + self.runOn(host, [config.get(self.program, 'home') + '/bin/zkServer.sh start']) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ['pgrep -f [Q]uorumPeerMain || true']) + return map(int, [line for line in stdout.split("\n") if line]) + +class Hadoop(Proc): + section = 'hadoop' + def __init__(self, program): + self.program = program + self._frequencyToKill = config.getfloat(self.section, program + '.frequency') + self.minimumToKill = config.getint(self.section, program + '.kill.min') + self.maximumToKill = config.getint(self.section, program + '.kill.max') + + def start(self, host): + binDir = config.get(self.section, 'bin') + self.runOn(host, ['nohup', binDir + "/hdfs", self.program, '&']) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ["pgrep -f 'proc[_]%s' || true" % (self.program,)]) + return map(int, [line for line in stdout.split("\n") if line]) + + def numberToKill(self): + return (self.minimumToKill, self.maximumToKill) + + def user(self): + return config.get(self.section, 'user') + +class Accumulo(Hadoop): + section = 'accumulo' + def start(self, host): + home = config.get(self.section, 'home') + self.runOn(host, ['nohup', home + '/bin/accumulo', self.program, '&']) + + def find(self, host): + code, stdout, stderr = self.runOn(host, ["pgrep -f 'app[=]%s' || true" % self.program]) + return map(int, [line for line in stdout.split("\n") if line]) + +def fail(msg): + import sys + logging.critical(msg) + sys.exit(1) + +def jitter(n): + return random.random() * n - n / 2 + +def sleep(n): + if n > 0: + logging.info("Sleeping %.2f", n) + import time + time.sleep(n) + +def agitate(hosts, procs): + starters = [] + + logging.info("Agitating %s on %d hosts" % (procs, len(hosts))) + + section = 'agitator' + + # repeatedly... + while True: + if starters: + # start up services that were previously killed + t = max(0, config.getfloat(section, 'sleep.restart') + jitter(config.getfloat(section, 'sleep.jitter'))) + sleep(t) + for host, proc in starters: + logging.info('Starting %s on %s', proc, host) + proc.start(host) + starters = [] + + # wait some time + t = max(0, config.getfloat(section, 'sleep') + jitter(config.getfloat(section, 'sleep.jitter'))) + sleep(t) + + # for some processes + for p in procs: + + # roll dice: should it be killed? + if random.random() < p.frequencyToKill(): + + # find them + # TODO: in parallel + candidates = {} + for host in hosts: + pids = p.find(host) + if pids: + candidates[host] = pids + + # how many? + minKill, maxKill = p.numberToKill() + count = min(random.randrange(minKill, maxKill + 1), len(candidates)) + + # pick the victims + doomedHosts = random.sample(candidates.keys(), count) + + # kill them + logging.info("Killing %s on %s", p, doomedHosts) + for doomedHost in doomedHosts: + pids = candidates[doomedHost] + if not pids: + logging.error("Unable to kill any %s on %s: no processes of that type are running", p, doomedHost) + else: + pid = random.choice(pids) + logging.info("Killing %s (%d) on %s", p, pid, host) + p.kill(doomedHost, pid) + # remember to restart them later + starters.append((doomedHost, p)) + +def main(): + import argparse + parser = argparse.ArgumentParser(description='Kill random processes') + parser.add_argument('--log', help='set the log level', default='INFO') + parser.add_argument('--namenodes', help='randomly kill namenodes', action="store_true") + parser.add_argument('--datanodes', help='randomly kill datanodes', action="store_true") + parser.add_argument('--tservers', help='randomly kill tservers', action="store_true") + parser.add_argument('--masters', help='randomly kill masters', action="store_true") + parser.add_argument('--zookeepers', help='randomly kill zookeepers', action="store_true") + parser.add_argument('--all', + help='kill any of the tservers, masters, datanodes, namenodes or zookeepers', + action='store_true') + parser.add_argument('--hosts', type=argparse.FileType('r'), required=True) + parser.add_argument('--config', type=argparse.FileType('r'), required=True) + args = parser.parse_args() + + config.readfp(args.config) + + level = getattr(logging, args.log.upper(), None) + if isinstance(level, int): + logging.basicConfig(level=level) + + procs = [] + def addIf(flag, proc): + if flag or args.all: + procs.append(proc) + + addIf(args.namenodes, Hadoop('namenode')) + addIf(args.datanodes, Hadoop('datanode')) + addIf(args.tservers, Accumulo('tserver')) + addIf(args.masters, Accumulo('master')) + addIf(args.zookeepers, Zookeeper()) + if len(procs) == 0: + fail("No processes to agitate!\n") + + hosts = [] + for line in args.hosts.readlines(): + line = line.strip() + if line and line[0] != '#': + hosts.append(line) + if not hosts: + fail('No hosts to agitate!\n') + + agitate(hosts, procs) + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/accumulo/blob/43f4464b/test/system/agitator/hosts.example ---------------------------------------------------------------------- diff --git a/test/system/agitator/hosts.example b/test/system/agitator/hosts.example new file mode 100644 index 0000000..63fb8bb --- /dev/null +++ b/test/system/agitator/hosts.example @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +localhost