Giuseppe Lavagetto has submitted this change and it was merged. Change subject: dynamicproxy: add support for kubernetes ......................................................................
dynamicproxy: add support for kubernetes Bug: T111916 Change-Id: I9821bafc37caaeb39ec7bd751bd76d73401e7ec0 --- A modules/toollabs/files/kube2dynproxy.py A modules/toollabs/manifests/kube2proxy.pp M modules/toollabs/manifests/proxy.pp A modules/toollabs/templates/initscripts/kube2proxy.systemd.erb A modules/toollabs/templates/initscripts/kube2proxy.upstart.erb 5 files changed, 323 insertions(+), 0 deletions(-) Approvals: Giuseppe Lavagetto: Looks good to me, approved jenkins-bot: Verified diff --git a/modules/toollabs/files/kube2dynproxy.py b/modules/toollabs/files/kube2dynproxy.py new file mode 100755 index 0000000..3f1c5ec --- /dev/null +++ b/modules/toollabs/files/kube2dynproxy.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +import requests +import argparse +import redis +import logging +import os +import yaml +import json + +LOG_FORMAT = "%(asctime)s %(message)s" +log = logging.getLogger() + +services_registry = 'k8s_services' + + +class KubeAuth(requests.auth.AuthBase): + def __init__(self, token): + self.token = token + + def __call__(self, r): + r.headers['Authorization'] = 'Bearer {}'.format(self.token) + return r + + +class KubeClient(object): + + def __init__(self, master, ca_cert, token, conn=redis.Redis()): + self.base_url = master + self.session = requests.Session() + self.session.auth = KubeAuth(token) + self.capath = ca_cert + self.conn = conn + self.base_params = {'labelSelector': 'toollabs=true'} + self.resourceVersion = 0 + + def get_services(self): + """Gets an initial list of all services""" + log.debug('Searching for existing services') + resp = self.session.get(self.url_for('/services'), + params=self.base_params, verify=self.capath) + services = [] + try: + servicelist = resp.json() + self.resourceVersion = int(servicelist['metadata']['resourceVersion']) + log.debug("global resourceVersion now at %s", self.resourceVersion) + for servicedata in servicelist['items']: + services.append(KubeClient._resp_to_service(servicedata)) + return services + except: + log.error("The services list was not correctly parsed.") + raise + + def get_service(self, name, namespace): + resp = self.session.get( + self.url_for('/namespaces/{}/services/{}'.format(namespace, name))) + if resp.status_code != '200': + raise ValueError("Found an unexpected response code %s when " + "searching for service %s" % (resp.status_code, + name)) + + def sync_services(self): + """Does a full sync of the services, returns a list + of the active ones.""" + services = self.get_services() + registered_services = self.conn.smembers(services_registry) + actual_services = set([str(serv) for serv in services]) + services_to_delete = registered_services - actual_services + for service in services_to_delete: + namespace, name, route = service.split('/') + try: + s = Service(name, namespace) + s.route = route + s.action = 'DELETED' + services.append(s) + except ValueError as e: + log.warning("Could not find service %s, skipping", service) + # TODO: remove it from the redis list anyways? + + return services + + @property + def services(self): + """yields any initial service and then any subsequent change""" + # Custom-built badass event loop + # Because python is webscale!!1! + while True: + # Note: if this fails we don't really want to recover + for service in self.sync_services(): + yield service + try: + # this should watch forever + resp = self.watch_services() + for line in resp.iter_lines(): + yield self._resp_to_service(json.loads(line)) + except Exception as e: + # If watching fails, start from scratch + log.error("An error occurred while watching for changes, " + "starting from scratch") + log.exception("Exception was: %s", e, exc_info=True) + pass + + def watch_services(self): + """Request the watch url and return the response handle.""" + params = {'resourceVersion': self.resourceVersion, 'watch': True} + params.update(self.base_params) + return self.session.get(self.url_for('/watch/services'), + params=params, stream=True) + + def url_for(self, path): + """Returns the full url for a specific path.""" + return "{}/api/v1{}".format(self.base_url, path) + + @staticmethod + def _resp_to_service(data, action='ADDED'): + """Transforms the response object into a Service object""" + obj = data.get('object', data) + action = data.get('type', action) + metadata = obj.get('metadata') + spec = obj.get('spec') + name = metadata.get('name') + namespace = metadata.get('namespace') + ipaddr = spec.get('clusterIP') + port = spec.get('ports').pop().get('port', "80") + labels = metadata.get('labels', {}) + s = Service(name, namespace, ipaddr, port, labels) + s.action = action + return s + + +class Service(object): + default_route = '.*' + + def __init__(self, name, namespace, ipaddr=None, port=None, labels=None): + self.name = name + self.namespace = namespace + self.ipaddr = ipaddr + self.labels = labels + self.port = port + self.action = 'ADDED' + self._route = None + + @property + def url(self): + return "http://{}:{}".format(self.ipaddr, self.port) + + @property + def route(self): + if not self._route: + try: + self._route = "/{}.*".format( + self.labels.get('toollabs-proxy-path')) + except: + self._route = self.default_route + return self._route + + @route.setter + def route(self, route): + if not route: + self._route = self.default_route + else: + self._route = "/{}.*".format(route) + + def write(self, conn): + # TODO: for now it's ok to use the tool name as a + # prefix; in the future we'll need to probably refine this a bit + key = "prefix:%s" % self.name + log.info("Service %s is %s", self, self.action) + if self.action == 'ADDED': + conn.hset(key, self.route, self.url) + conn.sadd(services_registry, str(self)) + elif self.action == 'MODIFIED': + oldroutes = conn.hgetall(key) + conn.hset(key, self.route, self.url) + conn.sadd(services_registry, str(self)) + for route, url in oldroutes.items(): + # a different url was pointing to our service + if route != self.route and url == self.url: + log.info("Removing stale route %s for service %s/%s", + route, self.namespace, self.name) + conn.hdel(key, route) + setkey = str(self).replace(self.route, route) + conn.srem(setkey) + elif self.action == 'DELETED': + conn.hdel(key, self.route) + conn.srem(services_registry, str(self)) + + def __str__(self): + r = self.route + if r.startswith('/'): + r = r[1:] + return "{}/{}/{}".format(self.namespace, self.name, r) + + +def main(): + parser = argparse.ArgumentParser( + description="Kubernetes to dynamicproxy syncronizer") + parser.add_argument('--config', default="", + help="Optional yaml config file") + parser.add_argument('-d', '--debug', action='store_true') + args = parser.parse_args() + + if args.debug: + level = logging.DEBUG + else: + level = logging.INFO + logging.basicConfig(format=LOG_FORMAT, level=level) + + if args.config: + with open(args.config, 'r') as fh: + + config = yaml.load(fh) + else: + config = { + 'redis': os.environ.get('K2D_REDIS_HOST', 'localhost:6379'), + 'kubernetes': { + 'master': os.environ.get( + 'K2D_KUBE_MASTER', + 'https://tools-k8s-master-01.tools.eqiad.wmflabs:6443'), + 'ca_cert': os.environ.get('K2D_KUBE_CA', + '/var/lib/kubernetes/ssl/ca.pem'), + 'token': os.environ.get('K2D_TOKEN', 'test') + } + } + + rhost, rport = config['redis'].split(':') + conn = redis.Redis(host=rhost, port=rport) + config['kubernetes']['conn'] = conn + kubecl = KubeClient(**config['kubernetes']) + for service in kubecl.services: + service.write(conn) + +if __name__ == '__main__': + main() diff --git a/modules/toollabs/manifests/kube2proxy.pp b/modules/toollabs/manifests/kube2proxy.pp new file mode 100644 index 0000000..bc0252d --- /dev/null +++ b/modules/toollabs/manifests/kube2proxy.pp @@ -0,0 +1,52 @@ +class toollabs::kube2proxy( + $ensure='present', + $ssldir='/var/lib/puppet/ssl/', + $kubemaster='https://tools-k8s-master-01.tools.eqiad.wmflabs:6443', + $kube_token='test', + ){ + + file { '/usr/local/sbin/kube2proxy': + ensure => $ensure, + owner => 'root', + group => 'root', + mode => '0555', + source => 'puppet:///modules/toollabs/kube2dynproxy.py', + } + + group { 'kubeproxy': + system => true, + } + + user { 'kubeproxy': + ensure => present, + gid => 'kubeproxy', + shell => '/bin/false', + home => '/nonexistent', + system => true, + } + + # Trusty's python-requests package is buggy + # and would break watching kubernetes for changes + package { 'requests': + provider => 'pip', + ensure => latest, + require => Package['python-requests'], + } + + include k8s::ssl + + # Temporarily not run on non-active proxies + # note that having redis based replication + # instead of having processes syncing every proxy + # with kubernetes is a bad idea, we're doing it just + # because that's how OGE integration worked. + $should_run = ($::hostname != $active_proxy) + base::service_unit{ 'kubesync': + ensure => $should_run, + refresh => true, + systemd => true, + upstart => true, + subscribe => [File['/usr/local/sbin/kube2proxy'],File['/var/lib/kubernetes/ssl/certs/ca.pem']], + } + +} diff --git a/modules/toollabs/manifests/proxy.pp b/modules/toollabs/manifests/proxy.pp index 49c67af..dbf7493 100644 --- a/modules/toollabs/manifests/proxy.pp +++ b/modules/toollabs/manifests/proxy.pp @@ -42,7 +42,11 @@ web_domain => $web_domain, } + $proxy_nodes = join($proxies, ' ') + + include ::toollabs::kube2proxy + # Open up redis to all proxies! ferm::service { 'redis-replication': proto => 'tcp', diff --git a/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb b/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb new file mode 100644 index 0000000..e04c953 --- /dev/null +++ b/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb @@ -0,0 +1,17 @@ +# Kube2proxy +# +# A simple service to sync information from the kubernetes cluster +# to toollabs' own dynamic proxy. +[Unit] +Description="Syncronizer for kubernetes-dynamicproxy integration" +After=redis-server +Requires=redis-server + +[Service] +User=kubeproxy +Environment="K2D_KUBE_CA=/var/lib/kubernetes/ssl/certs/ca.pem" +Environment="K2D_TOKEN=<%= @kube_token %>" +Environment="K2D_KUBE_MASTER=<%= @kubemaster %>" +EnvironmentFile="-/etc/default/kube2proxy" +ExecStart="/usr/local/sbin/kube2proxy" + diff --git a/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb b/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb new file mode 100644 index 0000000..169dcb9 --- /dev/null +++ b/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb @@ -0,0 +1,17 @@ +# Kube2proxy +# +# A simple service to sync information from the kubernetes cluster +# to toollabs' own dynamic proxy. +description "Syncronizer for kubernetes-dynamicproxy integration" + +# There is no point in trying to start (and fail) until redis is up +start on started redis-server + +setuid kubeproxy +setgid kubeproxy + +env "K2D_KUBE_CA='/var/lib/kubernetes/ssl/certs/ca.pem'" +env "K2D_TOKEN='<%= @kube_token %>'" +env "K2D_KUBE_MASTER='<%= @kubemaster %>'" + +exec /usr/local/sbin/kube2proxy -- To view, visit https://gerrit.wikimedia.org/r/241908 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9821bafc37caaeb39ec7bd751bd76d73401e7ec0 Gerrit-PatchSet: 7 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Giuseppe Lavagetto <glavage...@wikimedia.org> Gerrit-Reviewer: Giuseppe Lavagetto <glavage...@wikimedia.org> Gerrit-Reviewer: Tim Landscheidt <t...@tim-landscheidt.de> Gerrit-Reviewer: Yuvipanda <yuvipa...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits