Giuseppe Lavagetto has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/355609 )
Change subject: Split up pybal.pybal in smaller files ...................................................................... Split up pybal.pybal in smaller files This allows us to have smaller files and also to avoid import pitfalls with circular dependencies. Change-Id: I05ea6c9657c11330d3c001a1afe1eb2f115ff3b3 --- A pybal/bgpfailover.py R pybal/coordinator.py M pybal/ipvs.py A pybal/main.py M pybal/test/test_ipvs.py M pybal/test/test_pybal.py M scripts/pybal 7 files changed, 267 insertions(+), 233 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal refs/changes/09/355609/1 diff --git a/pybal/bgpfailover.py b/pybal/bgpfailover.py new file mode 100755 index 0000000..5392bcb --- /dev/null +++ b/pybal/bgpfailover.py @@ -0,0 +1,91 @@ +#!/usr/bin/python + +""" +PyBal +Copyright (C) 2006-2017 by Mark Bergsma <[email protected]> + +LVS Squid balancer/monitor for managing the Wikimedia Squid servers using LVS +""" + +from twisted.internet import reactor + +from pybal.util import log +try: + from pybal.bgp import bgp +except ImportError: + pass + + +class BGPFailover: + """Class for maintaining a BGP session to a router for IP address failover""" + + prefixes = {} + peerings = [] + + def __init__(self, globalConfig): + self.globalConfig = globalConfig + + if self.globalConfig.getboolean('bgp', False): + self.setup() + + def setup(self): + try: + self.bgpPeering = bgp.NaiveBGPPeering(myASN=self.globalConfig.getint('bgp-local-asn'), + peerAddr=self.globalConfig.get('bgp-peer-address')) + + asPath = [int(asn) for asn in self.globalConfig.get('bgp-as-path', str(self.bgpPeering.myASN)).split()] + med = self.globalConfig.getint('bgp-med', 0) + baseAttrs = [bgp.OriginAttribute(), bgp.ASPathAttribute(asPath)] + if med: baseAttrs.append(bgp.MEDAttribute(med)) + + attributes = {} + try: + attributes[(bgp.AFI_INET, bgp.SAFI_UNICAST)] = bgp.FrozenAttributeDict(baseAttrs + [ + bgp.NextHopAttribute(self.globalConfig['bgp-nexthop-ipv4'])]) + except KeyError: + if (bgp.AFI_INET, bgp.SAFI_UNICAST) in BGPFailover.prefixes: + raise ValueError("IPv4 BGP NextHop (global configuration variable 'bgp-nexthop-ipv4') not set") + + try: + attributes[(bgp.AFI_INET6, bgp.SAFI_UNICAST)] = bgp.FrozenAttributeDict(baseAttrs + [ + bgp.MPReachNLRIAttribute((bgp.AFI_INET6, bgp.SAFI_UNICAST, + bgp.IPv6IP(self.globalConfig['bgp-nexthop-ipv6']), []))]) + except KeyError: + if (bgp.AFI_INET6, bgp.SAFI_UNICAST) in BGPFailover.prefixes: + raise ValueError("IPv6 BGP NextHop (global configuration variable 'bgp-nexthop-ipv6') not set") + + advertisements = set([bgp.Advertisement(prefix, attributes[af], af) + for af in attributes.keys() + for prefix in BGPFailover.prefixes.get(af, set())]) + + self.bgpPeering.setEnabledAddressFamilies(set(attributes.keys())) + self.bgpPeering.setAdvertisements(advertisements) + self.bgpPeering.automaticStart() + except Exception: + log.critical("Could not set up BGP peering instance.") + raise + else: + BGPFailover.peerings.append(self.bgpPeering) + reactor.addSystemEventTrigger('before', 'shutdown', self.closeSession, self.bgpPeering) + try: + # Try to listen on the BGP port, not fatal if fails + reactor.listenTCP(bgp.PORT, bgp.BGPServerFactory({self.bgpPeering.peerAddr: self.bgpPeering})) + except Exception: + pass + + def closeSession(self, peering): + log.info("Clearing session to {}".format(peering.peerAddr)) + # Withdraw all announcements + peering.setAdvertisements(set()) + return peering.manualStop() + + @classmethod + def addPrefix(cls, prefix): + try: + if ':' not in prefix: + cls.prefixes.setdefault((bgp.AFI_INET, bgp.SAFI_UNICAST), set()).add(bgp.IPv4IP(prefix)) + else: + cls.prefixes.setdefault((bgp.AFI_INET6, bgp.SAFI_UNICAST), set()).add(bgp.IPv6IP(prefix)) + except NameError: + # bgp not imported + pass diff --git a/pybal/pybal.py b/pybal/coordinator.py similarity index 60% rename from pybal/pybal.py rename to pybal/coordinator.py index 968ffd0..bfc839e 100755 --- a/pybal/pybal.py +++ b/pybal/coordinator.py @@ -2,37 +2,24 @@ """ PyBal -Copyright (C) 2006-2014 by Mark Bergsma <[email protected]> +Copyright (C) 2006-2017 by Mark Bergsma <[email protected]> LVS Squid balancer/monitor for managing the Wikimedia Squid servers using LVS """ +import random +import socket -from __future__ import absolute_import - -from ConfigParser import SafeConfigParser - -import os, sys, signal, socket, random -import logging -from pybal import ipvs, util, config, etcd, instrumentation - -from twisted.python import failure -from twisted.internet import reactor, defer +from twisted.internet import defer from twisted.names import client, dns +from twisted.python import failure + +from pybal import config, util log = util.log -try: - from twisted.internet import inotify -except ImportError: - inotify = None - -try: - from pybal.bgp import bgp -except ImportError: - pass - class Server: + """ Class that maintains configuration and state of a single (real)server """ @@ -42,7 +29,7 @@ DEF_WEIGHT = 10 # Set of attributes allowed to be overridden in a server list - allowedConfigKeys = [ ('host', str), ('weight', int), ('enabled', bool) ] + allowedConfigKeys = [('host', str), ('weight', int), ('enabled', bool)] def __init__(self, host, lvsservice, addressFamily=None): """Constructor""" @@ -52,7 +39,8 @@ if addressFamily: self.addressFamily = addressFamily else: - self.addressFamily = (':' in self.lvsservice.ip) and socket.AF_INET6 or socket.AF_INET + self.addressFamily = ( + ':' in self.lvsservice.ip) and socket.AF_INET6 or socket.AF_INET self.ip = None self.port = 80 self.ip4_addresses = set() @@ -98,11 +86,11 @@ query = dns.Query(self.host, dns.A) lookups.append(client.lookupAddress(self.host, timeout - ).addCallback(self._lookupFinished, socket.AF_INET, query)) + ).addCallback(self._lookupFinished, socket.AF_INET, query)) query = dns.Query(self.host, dns.AAAA) lookups.append(client.lookupIPV6Address(self.host, timeout - ).addCallback(self._lookupFinished, socket.AF_INET6, query)) + ).addCallback(self._lookupFinished, socket.AF_INET6, query)) return defer.DeferredList(lookups).addBoth(self._hostnameResolved) @@ -117,7 +105,7 @@ self.ip6_addresses = ips # TODO: expire TTL - #if self.ip: + # if self.ip: # minTTL = min([r.ttl for r in answers # if r.name == query.name and r.type == query.type]) @@ -134,17 +122,17 @@ ip_addresses = { socket.AF_INET: - self.ip4_addresses, + self.ip4_addresses, socket.AF_INET6: - self.ip6_addresses - }[self.addressFamily] + self.ip6_addresses + }[self.addressFamily] try: if not self.ip or self.ip not in ip_addresses: self.ip = random.choice(list(ip_addresses)) # TODO: (re)pool except IndexError: - return failure.Failure() # TODO: be more specific? + return failure.Failure() # TODO: be more specific? else: return True @@ -182,10 +170,10 @@ """ log.error("Initialization failed for server {}".format(self.host)) - assert self.ready == False + assert self.ready is False self.maintainState() - return False # Continue on success callback chain + return False # Continue on success callback chain def createMonitoringInstances(self, coordinator): """Creates and runs monitoring instances for this Server""" @@ -207,12 +195,15 @@ else: for monitorname in monitorlist: try: - monitormodule = getattr(__import__('pybal.monitors', fromlist=[monitorname.lower()], level=0), monitorname.lower()) + monitormodule = getattr(__import__( + 'pybal.monitors', fromlist=[monitorname.lower()], level=0), monitorname.lower()) except AttributeError: log.err("Monitor {} does not exist".format(monitorname)) else: - monitorclass = getattr(monitormodule, monitorname + 'MonitoringProtocol') - monitor = monitorclass(coordinator, self, lvsservice.configuration) + monitorclass = getattr( + monitormodule, monitorname + 'MonitoringProtocol') + monitor = monitorclass( + coordinator, self, lvsservice.configuration) self.addMonitor(monitor) monitor.run() @@ -220,17 +211,18 @@ """AND quantification of monitor.up over all monitoring instances of a single Server""" # Global status is up iff all monitors report up - return reduce(lambda b,monitor: b and monitor.up, self.monitors, len(self.monitors) != 0) + return reduce(lambda b, monitor: b and monitor.up, self.monitors, len(self.monitors) != 0) def calcPartialStatus(self): """OR quantification of monitor.up over all monitoring instances of a single Server""" # Partial status is up iff one of the monitors reports up - return reduce(lambda b,monitor: b or monitor.up, self.monitors, len(self.monitors) == 0) + return reduce(lambda b, monitor: b or monitor.up, self.monitors, len(self.monitors) == 0) def textStatus(self): return "%s/%s/%s" % (self.enabled and "enabled" or "disabled", - self.up and "up" or (self.calcPartialStatus() and "partially up" or "down"), + self.up and "up" or ( + self.calcPartialStatus() and "partially up" or "down"), self.pooled and "pooled" or "not pooled") def maintainState(self): @@ -253,7 +245,8 @@ # Overwrite configuration self.__dict__.update(configuration) self.maintainState() - self.modified = True # Indicate that this instance previously existed + # Indicate that this instance previously existed + self.modified = True def dumpState(self): """Dump current state of the server""" @@ -267,7 +260,7 @@ dictionary of (allowed) configuration attributes """ - server = cls(hostName, lvsservice) # create a new instance... + server = cls(hostName, lvsservice) # create a new instance... server.merge(configuration) # ...and override attributes server.modified = False @@ -449,189 +442,3 @@ # Assign the updated list of enabled servers to the LVSService instance self.assignServers() - - -class BGPFailover: - """Class for maintaining a BGP session to a router for IP address failover""" - - prefixes = {} - peerings = [] - - def __init__(self, globalConfig): - self.globalConfig = globalConfig - - if self.globalConfig.getboolean('bgp', False): - self.setup() - - def setup(self): - try: - self.bgpPeering = bgp.NaiveBGPPeering(myASN=self.globalConfig.getint('bgp-local-asn'), - peerAddr=self.globalConfig.get('bgp-peer-address')) - - asPath = [int(asn) for asn in self.globalConfig.get('bgp-as-path', str(self.bgpPeering.myASN)).split()] - med = self.globalConfig.getint('bgp-med', 0) - baseAttrs = [bgp.OriginAttribute(), bgp.ASPathAttribute(asPath)] - if med: baseAttrs.append(bgp.MEDAttribute(med)) - - attributes = {} - try: - attributes[(bgp.AFI_INET, bgp.SAFI_UNICAST)] = bgp.FrozenAttributeDict(baseAttrs + [ - bgp.NextHopAttribute(self.globalConfig['bgp-nexthop-ipv4'])]) - except KeyError: - if (bgp.AFI_INET, bgp.SAFI_UNICAST) in BGPFailover.prefixes: - raise ValueError("IPv4 BGP NextHop (global configuration variable 'bgp-nexthop-ipv4') not set") - - try: - attributes[(bgp.AFI_INET6, bgp.SAFI_UNICAST)] = bgp.FrozenAttributeDict(baseAttrs + [ - bgp.MPReachNLRIAttribute((bgp.AFI_INET6, bgp.SAFI_UNICAST, - bgp.IPv6IP(self.globalConfig['bgp-nexthop-ipv6']), []))]) - except KeyError: - if (bgp.AFI_INET6, bgp.SAFI_UNICAST) in BGPFailover.prefixes: - raise ValueError("IPv6 BGP NextHop (global configuration variable 'bgp-nexthop-ipv6') not set") - - advertisements = set([bgp.Advertisement(prefix, attributes[af], af) - for af in attributes.keys() - for prefix in BGPFailover.prefixes.get(af, set())]) - - self.bgpPeering.setEnabledAddressFamilies(set(attributes.keys())) - self.bgpPeering.setAdvertisements(advertisements) - self.bgpPeering.automaticStart() - except Exception: - log.critical("Could not set up BGP peering instance.") - raise - else: - BGPFailover.peerings.append(self.bgpPeering) - reactor.addSystemEventTrigger('before', 'shutdown', self.closeSession, self.bgpPeering) - try: - # Try to listen on the BGP port, not fatal if fails - reactor.listenTCP(bgp.PORT, bgp.BGPServerFactory({self.bgpPeering.peerAddr: self.bgpPeering})) - except Exception: - pass - - def closeSession(self, peering): - log.info("Clearing session to {}".format(peering.peerAddr)) - # Withdraw all announcements - peering.setAdvertisements(set()) - return peering.manualStop() - - @classmethod - def addPrefix(cls, prefix): - try: - if ':' not in prefix: - cls.prefixes.setdefault((bgp.AFI_INET, bgp.SAFI_UNICAST), set()).add(bgp.IPv4IP(prefix)) - else: - cls.prefixes.setdefault((bgp.AFI_INET6, bgp.SAFI_UNICAST), set()).add(bgp.IPv6IP(prefix)) - except NameError: - # bgp not imported - pass - - -def parseCommandLine(configuration): - """ - Parses the command line arguments, and sets configuration options - in dictionary configuration. - """ - import argparse - parser = argparse.ArgumentParser( - description="Load Balancer manager script.", - epilog="See <https://wikitech.wikimedia.org/wiki/PyBal> for more." - ) - parser.add_argument("-c", dest="conffile", help="Configuration file", - default="/etc/pybal/pybal.conf") - parser.add_argument("-n", "--dryrun", action="store_true", - help="Dry Run mode, do not actually update.") - parser.add_argument("-d", "--debug", action="store_true", - help="Debug mode, run in foreground, " - "log to stdout LVS configuration/state, " - "print commands") - args = parser.parse_args() - configuration.update(args.__dict__) - - -def sighandler(signum, frame): - """ - Signal handler - """ - if signum == signal.SIGHUP: - # TODO: reload config - pass - else: - # Stop the reactor if it's running - if reactor.running: - reactor.stop() - - -def installSignalHandlers(): - """ - Installs Unix signal handlers, e.g. to run terminate() on TERM - """ - - signals = [signal.SIGTERM, signal.SIGHUP, signal.SIGINT] - - for sig in signals: - signal.signal(sig, sighandler) - - -def main(): - services, cliconfig = {}, {} - - # Parse the command line - parseCommandLine(cliconfig) - - # Read the configuration file - config = SafeConfigParser() - config.read(cliconfig['conffile']) - - try: - # Install signal handlers - installSignalHandlers() - - for section in config.sections(): - if section != 'global': - cfgtuple = ( - config.get(section, 'protocol'), - config.get(section, 'ip'), - config.getint(section, 'port'), - config.get(section, 'scheduler')) - - # Read the custom configuration options of the LVS section - configdict = util.ConfigDict(config.items(section)) - - # Override with command line options - configdict.update(cliconfig) - - if section != 'global': - services[section] = ipvs.LVSService(section, cfgtuple, configuration=configdict) - crd = Coordinator(services[section], - configUrl=config.get(section, 'config')) - log.info("Created LVS service '{}'".format(section)) - instrumentation.PoolsRoot.addPool(crd.lvsservice.name, crd) - - # Set up BGP - try: - configdict = util.ConfigDict(config.items('global')) - except Exception: - configdict = util.ConfigDict() - configdict.update(cliconfig) - - # Set the logging level - if configdict.get('debug', False): - util.PyBalLogObserver.level = logging.DEBUG - else: - util.PyBalLogObserver.level = logging.INFO - - bgpannouncement = BGPFailover(configdict) - - # Run the web server for instrumentation - if configdict.getboolean('instrumentation', False): - from twisted.web.server import Site - port = configdict.getint('instrumentation_port', 9090) - factory = Site(instrumentation.ServerRoot()) - reactor.listenTCP(port, factory) - - reactor.run() - finally: - log.info("Exiting...") - -if __name__ == '__main__': - main() diff --git a/pybal/ipvs.py b/pybal/ipvs.py index 4429f65..eda6474 100644 --- a/pybal/ipvs.py +++ b/pybal/ipvs.py @@ -5,6 +5,7 @@ LVS state/configuration classes for PyBal """ from . import util +from pybal.bgpfailover import BGPFailover import os log = util.log @@ -182,7 +183,6 @@ self.ipvsManager.Debug = configuration.getboolean('debug', False) if self.configuration.getboolean('bgp', True): - from pybal import BGPFailover # Add service ip to the BGP announcements BGPFailover.addPrefix(self.ip) diff --git a/pybal/main.py b/pybal/main.py new file mode 100755 index 0000000..a64a2ad --- /dev/null +++ b/pybal/main.py @@ -0,0 +1,136 @@ +#!/usr/bin/python + +""" +PyBal +Copyright (C) 2006-2014 by Mark Bergsma <[email protected]> + +LVS Squid balancer/monitor for managing the Wikimedia Squid servers using LVS +""" + +from __future__ import absolute_import + +import argparse +import logging +import signal + +from ConfigParser import SafeConfigParser + +from twisted.internet import reactor + +from pybal import util, ipvs, instrumentation +from pybal.bgpfailover import BGPFailover +from pybal.coordinator import Coordinator + +log = util.log + + +def parseCommandLine(configuration): + """ + Parses the command line arguments, and sets configuration options + in dictionary configuration. + """ + parser = argparse.ArgumentParser( + description="Load Balancer manager script.", + epilog="See <https://wikitech.wikimedia.org/wiki/PyBal> for more." + ) + parser.add_argument("-c", dest="conffile", help="Configuration file", + default="/etc/pybal/pybal.conf") + parser.add_argument("-n", "--dryrun", action="store_true", + help="Dry Run mode, do not actually update.") + parser.add_argument("-d", "--debug", action="store_true", + help="Debug mode, run in foreground, " + "log to stdout LVS configuration/state, " + "print commands") + args = parser.parse_args() + configuration.update(args.__dict__) + + +def sighandler(signum, frame): + """ + Signal handler + """ + if signum == signal.SIGHUP: + # TODO: reload config + pass + else: + # Stop the reactor if it's running + if reactor.running: + reactor.stop() + + +def installSignalHandlers(): + """ + Installs Unix signal handlers, e.g. to run terminate() on TERM + """ + + signals = [signal.SIGTERM, signal.SIGHUP, signal.SIGINT] + + for sig in signals: + signal.signal(sig, sighandler) + + +def main(): + services, cliconfig = {}, {} + + # Parse the command line + parseCommandLine(cliconfig) + + # Read the configuration file + config = SafeConfigParser() + config.read(cliconfig['conffile']) + + try: + # Install signal handlers + installSignalHandlers() + + for section in config.sections(): + if section != 'global': + cfgtuple = ( + config.get(section, 'protocol'), + config.get(section, 'ip'), + config.getint(section, 'port'), + config.get(section, 'scheduler')) + + # Read the custom configuration options of the LVS section + configdict = util.ConfigDict(config.items(section)) + + # Override with command line options + configdict.update(cliconfig) + + if section != 'global': + services[section] = ipvs.LVSService( + section, cfgtuple, configuration=configdict) + crd = Coordinator( + services[section], + configUrl=config.get(section, 'config')) + log.info("Created LVS service '{}'".format(section)) + instrumentation.PoolsRoot.addPool(crd.lvsservice.name, crd) + + # Set up BGP + try: + configdict = util.ConfigDict(config.items('global')) + except Exception: + configdict = util.ConfigDict() + configdict.update(cliconfig) + + # Set the logging level + if configdict.get('debug', False): + util.PyBalLogObserver.level = logging.DEBUG + else: + util.PyBalLogObserver.level = logging.INFO + + BGPFailover(configdict) + + # Run the web server for instrumentation + if configdict.getboolean('instrumentation', False): + from twisted.web.server import Site + port = configdict.getint('instrumentation_port', 9090) + factory = Site(instrumentation.ServerRoot()) + reactor.listenTCP(port, factory) + + reactor.run() + finally: + log.info("Exiting...") + +if __name__ == '__main__': + main() diff --git a/pybal/test/test_ipvs.py b/pybal/test/test_ipvs.py index f636c30..45da486 100644 --- a/pybal/test/test_ipvs.py +++ b/pybal/test/test_ipvs.py @@ -8,7 +8,7 @@ """ import pybal.ipvs import pybal.util -import pybal.pybal +import pybal.bgpfailover from .fixtures import PyBalTestCase, ServerStub @@ -105,7 +105,7 @@ super(LVSServiceTestCase, self).setUp() self.config['dryrun'] = 'true' self.service = ('tcp', '127.0.0.1', 80, 'rr') - pybal.pybal.BGPFailover.prefixes.clear() + pybal.bgpfailover.BGPFailover.prefixes.clear() def stubbedModifyState(cls, cmdList): cls.cmdList = cmdList @@ -129,7 +129,7 @@ self.config['bgp'] = 'true' pybal.ipvs.LVSService('http', self.service, self.config) - self.assertItemsEqual(pybal.pybal.BGPFailover.prefixes, {(1, 1)}) + self.assertItemsEqual(pybal.bgpfailover.BGPFailover.prefixes, {(1, 1)}) def testService(self): """Test `LVSService.service`.""" diff --git a/pybal/test/test_pybal.py b/pybal/test/test_pybal.py index 31e4215..a863996 100644 --- a/pybal/test/test_pybal.py +++ b/pybal/test/test_pybal.py @@ -9,7 +9,7 @@ import sys import mock from .fixtures import PyBalTestCase -from pybal.pybal import parseCommandLine +from pybal.main import parseCommandLine class TestBaseUtils(PyBalTestCase): diff --git a/scripts/pybal b/scripts/pybal old mode 100644 new mode 100755 index a784dcb..9e754d4 --- a/scripts/pybal +++ b/scripts/pybal @@ -7,7 +7,7 @@ LVS Squid balancer/monitor for managing the Wikimedia Squid servers using LVS """ -from pybal import pybal +from pybal import main if __name__ == '__main__': - pybal.main() + main.main() -- To view, visit https://gerrit.wikimedia.org/r/355609 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I05ea6c9657c11330d3c001a1afe1eb2f115ff3b3 Gerrit-PatchSet: 1 Gerrit-Project: operations/debs/pybal Gerrit-Branch: master Gerrit-Owner: Giuseppe Lavagetto <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
