Giuseppe Lavagetto has submitted this change and it was merged.

Change subject: dynamicproxy: add support for kubernetes
......................................................................


dynamicproxy: add support for kubernetes

Bug: T111916
Change-Id: I9821bafc37caaeb39ec7bd751bd76d73401e7ec0
---
A modules/toollabs/files/kube2dynproxy.py
A modules/toollabs/manifests/kube2proxy.pp
M modules/toollabs/manifests/proxy.pp
A modules/toollabs/templates/initscripts/kube2proxy.systemd.erb
A modules/toollabs/templates/initscripts/kube2proxy.upstart.erb
5 files changed, 323 insertions(+), 0 deletions(-)

Approvals:
  Giuseppe Lavagetto: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/modules/toollabs/files/kube2dynproxy.py 
b/modules/toollabs/files/kube2dynproxy.py
new file mode 100755
index 0000000..3f1c5ec
--- /dev/null
+++ b/modules/toollabs/files/kube2dynproxy.py
@@ -0,0 +1,233 @@
+#!/usr/bin/env python3
+import requests
+import argparse
+import redis
+import logging
+import os
+import yaml
+import json
+
+LOG_FORMAT = "%(asctime)s %(message)s"
+log = logging.getLogger()
+
+services_registry = 'k8s_services'
+
+
+class KubeAuth(requests.auth.AuthBase):
+    def __init__(self, token):
+        self.token = token
+
+    def __call__(self, r):
+        r.headers['Authorization'] = 'Bearer {}'.format(self.token)
+        return r
+
+
+class KubeClient(object):
+
+    def __init__(self, master, ca_cert, token, conn=redis.Redis()):
+        self.base_url = master
+        self.session = requests.Session()
+        self.session.auth = KubeAuth(token)
+        self.capath = ca_cert
+        self.conn = conn
+        self.base_params = {'labelSelector': 'toollabs=true'}
+        self.resourceVersion = 0
+
+    def get_services(self):
+        """Gets an initial list of all services"""
+        log.debug('Searching for existing services')
+        resp = self.session.get(self.url_for('/services'),
+                                params=self.base_params, verify=self.capath)
+        services = []
+        try:
+            servicelist = resp.json()
+            self.resourceVersion = 
int(servicelist['metadata']['resourceVersion'])
+            log.debug("global resourceVersion now at %s", self.resourceVersion)
+            for servicedata in servicelist['items']:
+                services.append(KubeClient._resp_to_service(servicedata))
+            return services
+        except:
+            log.error("The services list was not correctly parsed.")
+            raise
+
+    def get_service(self, name, namespace):
+        resp = self.session.get(
+            self.url_for('/namespaces/{}/services/{}'.format(namespace, name)))
+        if resp.status_code != '200':
+            raise ValueError("Found an unexpected response code %s when "
+                             "searching for service %s" % (resp.status_code,
+                                                           name))
+
+    def sync_services(self):
+        """Does a full sync of the services, returns a list
+        of the active ones."""
+        services = self.get_services()
+        registered_services = self.conn.smembers(services_registry)
+        actual_services = set([str(serv) for serv in services])
+        services_to_delete = registered_services - actual_services
+        for service in services_to_delete:
+            namespace, name, route = service.split('/')
+            try:
+                s = Service(name, namespace)
+                s.route = route
+                s.action = 'DELETED'
+                services.append(s)
+            except ValueError as e:
+                log.warning("Could not find service %s, skipping", service)
+                # TODO: remove it from the redis list anyways?
+
+        return services
+
+    @property
+    def services(self):
+        """yields any initial service and then any subsequent change"""
+        # Custom-built badass event loop
+        # Because python is webscale!!1!
+        while True:
+            # Note: if this fails we don't really want to recover
+            for service in self.sync_services():
+                yield service
+            try:
+                # this should watch forever
+                resp = self.watch_services()
+                for line in resp.iter_lines():
+                    yield self._resp_to_service(json.loads(line))
+            except Exception as e:
+                # If watching fails, start from scratch
+                log.error("An error occurred while watching for changes, "
+                          "starting from scratch")
+                log.exception("Exception was: %s", e, exc_info=True)
+                pass
+
+    def watch_services(self):
+        """Request the watch url and return the response handle."""
+        params = {'resourceVersion': self.resourceVersion, 'watch': True}
+        params.update(self.base_params)
+        return self.session.get(self.url_for('/watch/services'),
+                                params=params, stream=True)
+
+    def url_for(self, path):
+        """Returns the full url for a specific path."""
+        return "{}/api/v1{}".format(self.base_url, path)
+
+    @staticmethod
+    def _resp_to_service(data, action='ADDED'):
+        """Transforms the response object into a Service object"""
+        obj = data.get('object', data)
+        action = data.get('type', action)
+        metadata = obj.get('metadata')
+        spec = obj.get('spec')
+        name = metadata.get('name')
+        namespace = metadata.get('namespace')
+        ipaddr = spec.get('clusterIP')
+        port = spec.get('ports').pop().get('port', "80")
+        labels = metadata.get('labels', {})
+        s = Service(name, namespace, ipaddr, port, labels)
+        s.action = action
+        return s
+
+
+class Service(object):
+    default_route = '.*'
+
+    def __init__(self, name, namespace, ipaddr=None, port=None, labels=None):
+        self.name = name
+        self.namespace = namespace
+        self.ipaddr = ipaddr
+        self.labels = labels
+        self.port = port
+        self.action = 'ADDED'
+        self._route = None
+
+    @property
+    def url(self):
+        return "http://{}:{}".format(self.ipaddr, self.port)
+
+    @property
+    def route(self):
+        if not self._route:
+            try:
+                self._route = "/{}.*".format(
+                    self.labels.get('toollabs-proxy-path'))
+            except:
+                self._route = self.default_route
+        return self._route
+
+    @route.setter
+    def route(self, route):
+        if not route:
+            self._route = self.default_route
+        else:
+            self._route = "/{}.*".format(route)
+
+    def write(self, conn):
+        # TODO: for now it's ok to use the tool name as a
+        # prefix; in the future we'll need to probably refine this a bit
+        key = "prefix:%s" % self.name
+        log.info("Service %s is %s", self, self.action)
+        if self.action == 'ADDED':
+            conn.hset(key, self.route, self.url)
+            conn.sadd(services_registry, str(self))
+        elif self.action == 'MODIFIED':
+            oldroutes = conn.hgetall(key)
+            conn.hset(key, self.route, self.url)
+            conn.sadd(services_registry, str(self))
+            for route, url in oldroutes.items():
+                # a different url was pointing to our service
+                if route != self.route and url == self.url:
+                    log.info("Removing stale route %s for service %s/%s",
+                             route, self.namespace, self.name)
+                    conn.hdel(key, route)
+                    setkey = str(self).replace(self.route, route)
+                    conn.srem(setkey)
+        elif self.action == 'DELETED':
+            conn.hdel(key, self.route)
+            conn.srem(services_registry, str(self))
+
+    def __str__(self):
+        r = self.route
+        if r.startswith('/'):
+            r = r[1:]
+        return "{}/{}/{}".format(self.namespace, self.name, r)
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Kubernetes to dynamicproxy syncronizer")
+    parser.add_argument('--config', default="",
+                        help="Optional yaml config file")
+    parser.add_argument('-d', '--debug', action='store_true')
+    args = parser.parse_args()
+
+    if args.debug:
+        level = logging.DEBUG
+    else:
+        level = logging.INFO
+    logging.basicConfig(format=LOG_FORMAT, level=level)
+
+    if args.config:
+        with open(args.config, 'r') as fh:
+
+            config = yaml.load(fh)
+    else:
+        config = {
+            'redis': os.environ.get('K2D_REDIS_HOST', 'localhost:6379'),
+            'kubernetes': {
+                'master': os.environ.get(
+                    'K2D_KUBE_MASTER',
+                    'https://tools-k8s-master-01.tools.eqiad.wmflabs:6443'),
+                'ca_cert': os.environ.get('K2D_KUBE_CA',
+                                          '/var/lib/kubernetes/ssl/ca.pem'),
+                'token': os.environ.get('K2D_TOKEN', 'test')
+            }
+        }
+
+    rhost, rport = config['redis'].split(':')
+    conn = redis.Redis(host=rhost, port=rport)
+    config['kubernetes']['conn'] = conn
+    kubecl = KubeClient(**config['kubernetes'])
+    for service in kubecl.services:
+        service.write(conn)
+
+if __name__ == '__main__':
+    main()
diff --git a/modules/toollabs/manifests/kube2proxy.pp 
b/modules/toollabs/manifests/kube2proxy.pp
new file mode 100644
index 0000000..bc0252d
--- /dev/null
+++ b/modules/toollabs/manifests/kube2proxy.pp
@@ -0,0 +1,52 @@
+class toollabs::kube2proxy(
+    $ensure='present',
+    $ssldir='/var/lib/puppet/ssl/',
+    $kubemaster='https://tools-k8s-master-01.tools.eqiad.wmflabs:6443',
+    $kube_token='test',
+    ){
+
+    file { '/usr/local/sbin/kube2proxy':
+        ensure => $ensure,
+        owner  => 'root',
+        group  => 'root',
+        mode   => '0555',
+        source => 'puppet:///modules/toollabs/kube2dynproxy.py',
+    }
+
+    group { 'kubeproxy':
+        system => true,
+    }
+
+    user { 'kubeproxy':
+        ensure => present,
+        gid    => 'kubeproxy',
+        shell  => '/bin/false',
+        home   => '/nonexistent',
+        system => true,
+    }
+
+    # Trusty's python-requests package is buggy
+    # and would break watching kubernetes for changes
+    package { 'requests':
+        provider => 'pip',
+        ensure   => latest,
+        require  => Package['python-requests'],
+    }
+
+    include k8s::ssl
+
+    # Temporarily not run on non-active proxies
+    # note that having redis based replication
+    # instead of having processes syncing every proxy
+    # with kubernetes is a bad idea, we're doing it just
+    # because that's how OGE integration worked.
+    $should_run = ($::hostname != $active_proxy)
+    base::service_unit{ 'kubesync':
+        ensure    => $should_run,
+        refresh   => true,
+        systemd   => true,
+        upstart   => true,
+        subscribe => 
[File['/usr/local/sbin/kube2proxy'],File['/var/lib/kubernetes/ssl/certs/ca.pem']],
+    }
+
+}
diff --git a/modules/toollabs/manifests/proxy.pp 
b/modules/toollabs/manifests/proxy.pp
index 49c67af..dbf7493 100644
--- a/modules/toollabs/manifests/proxy.pp
+++ b/modules/toollabs/manifests/proxy.pp
@@ -42,7 +42,11 @@
         web_domain           => $web_domain,
     }
 
+
     $proxy_nodes = join($proxies, ' ')
+
+    include ::toollabs::kube2proxy
+
     # Open up redis to all proxies!
     ferm::service { 'redis-replication':
         proto  => 'tcp',
diff --git a/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb 
b/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb
new file mode 100644
index 0000000..e04c953
--- /dev/null
+++ b/modules/toollabs/templates/initscripts/kube2proxy.systemd.erb
@@ -0,0 +1,17 @@
+# Kube2proxy
+#
+# A simple service to sync information from the kubernetes cluster
+# to toollabs' own dynamic proxy.
+[Unit]
+Description="Syncronizer for kubernetes-dynamicproxy integration"
+After=redis-server
+Requires=redis-server
+
+[Service]
+User=kubeproxy
+Environment="K2D_KUBE_CA=/var/lib/kubernetes/ssl/certs/ca.pem"
+Environment="K2D_TOKEN=<%= @kube_token %>"
+Environment="K2D_KUBE_MASTER=<%= @kubemaster %>"
+EnvironmentFile="-/etc/default/kube2proxy"
+ExecStart="/usr/local/sbin/kube2proxy"
+
diff --git a/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb 
b/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb
new file mode 100644
index 0000000..169dcb9
--- /dev/null
+++ b/modules/toollabs/templates/initscripts/kube2proxy.upstart.erb
@@ -0,0 +1,17 @@
+# Kube2proxy
+#
+# A simple service to sync information from the kubernetes cluster
+# to toollabs' own dynamic proxy.
+description "Syncronizer for kubernetes-dynamicproxy integration"
+
+# There is no point in trying to start (and fail) until redis is up
+start on started redis-server
+
+setuid kubeproxy
+setgid kubeproxy
+
+env "K2D_KUBE_CA='/var/lib/kubernetes/ssl/certs/ca.pem'"
+env "K2D_TOKEN='<%= @kube_token %>'"
+env "K2D_KUBE_MASTER='<%= @kubemaster %>'"
+
+exec /usr/local/sbin/kube2proxy

-- 
To view, visit https://gerrit.wikimedia.org/r/241908
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I9821bafc37caaeb39ec7bd751bd76d73401e7ec0
Gerrit-PatchSet: 7
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Giuseppe Lavagetto <glavage...@wikimedia.org>
Gerrit-Reviewer: Giuseppe Lavagetto <glavage...@wikimedia.org>
Gerrit-Reviewer: Tim Landscheidt <t...@tim-landscheidt.de>
Gerrit-Reviewer: Yuvipanda <yuvipa...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to