jenkins-bot has submitted this change and it was merged.

Change subject: Add EtcdConfigurationObserver
......................................................................


Add EtcdConfigurationObserver

Add a simple etcd client implementation that can watch a key for changes, using
the etcd API's wait / waitIndex parameters to ensure consistency.

Change-Id: Ia6d898df142ebd17384ff50beadf63e0a2bbcfca
---
M pybal/__init__.py
A pybal/etcd.py
M pybal/pybal.py
A pybal/version.py
M requirements.txt
M setup.py
M tox.ini
7 files changed, 170 insertions(+), 4 deletions(-)

Approvals:
  Giuseppe Lavagetto: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/pybal/__init__.py b/pybal/__init__.py
index e838db5..ade8bc3 100644
--- a/pybal/__init__.py
+++ b/pybal/__init__.py
@@ -6,9 +6,8 @@
 """
 import test
 
-__version__ = '1.6'
+from .version import *
 
-USER_AGENT_STRING = 'PyBal/%s' % __version__
 
 __all__ = ('ipvs', 'monitor', 'pybal', 'util', 'monitors', 'bgp',
            'config', 'instrumentation', 'USER_AGENT_STRING')
diff --git a/pybal/etcd.py b/pybal/etcd.py
new file mode 100644
index 0000000..eee4896
--- /dev/null
+++ b/pybal/etcd.py
@@ -0,0 +1,150 @@
+# -*- coding: utf-8 -*-
+"""
+  PyBal etcd client
+  ~~~~~~~~~~~~~~~~~
+
+  This module allows PyBal to be configured via etcd.
+
+"""
+from __future__ import absolute_import
+
+import copy
+import json
+import urllib
+
+from twisted.internet import defer, reactor, ssl
+from twisted.python import failure
+from twisted.web import error
+from twisted.web.client import HTTPClientFactory
+from twisted.web.http import HTTPClient, urlparse
+
+from .config import ConfigurationObserver
+from .version import USER_AGENT_STRING
+from .util import log
+
+def decode_node(node):
+    """Decode an individual node from an etcd response."""
+    key = node['key'].rsplit('/', 1)[-1]
+    value = json.loads(node['value'])
+    pooled = value.pop('pooled', None)
+    if pooled is not None:
+        value['enabled'] = pooled == 'yes'
+    return key, value
+
+
+def decode_etcd_data(data):
+    """Simplify an etcd response by stripping leading path components
+    from key names, decoding JSON values, and removing etcd metadata."""
+    node = data['node']
+    if node.get('dir'):
+        return dict(decode_node(child) for child in node['nodes'])
+    else:
+        key, value = decode_node(node)
+        return {key: value}
+
+
+class EtcdClient(HTTPClient):
+    """Represents a client for the etcd HTTP API."""
+    etcdIndex = 0
+
+    def connectionMade(self):
+        self.sendCommand('GET', self.factory.getPath())
+        self.sendHeader('Host', self.factory.host)
+        self.sendHeader('User-Agent', self.factory.agent)
+        self.endHeaders()
+
+    def handleStatus(self, version, status, message):
+        self.version = version
+        self.status = status
+        self.message = message
+
+    def handleResponse(self, response):
+        if self.status != '200':
+            err = error.Error(self.status, self.message, response)
+            self.factory.onFailure(failure.Failure(err))
+        else:
+            try:
+                config = json.loads(response)
+            except Exception as e:
+                self.factory.onFailure(failure.Failure())
+            else:
+                self.factory.onUpdate(config, self.etcdIndex)
+        self.transport.loseConnection()
+
+    def handleHeader(self, key, val):
+        if key == 'X-Etcd-Index':
+            self.etcdIndex = int(val)
+
+    def timeout(self):
+        err = defer.TimeoutError(
+            'Retrieving key %s from %s took longer than %s seconds.' %
+            (self.factory.key, self.factory.host, self.factory.timeout))
+        self.factory.onFailure(err)
+        self.transport.loseConnection()
+
+
+class EtcdConfigurationObserver(ConfigurationObserver, HTTPClientFactory):
+    """A factory that will continuously monitor an etcd key for changes."""
+
+    urlScheme = 'etcd://'
+
+    agent = USER_AGENT_STRING
+    method = 'GET'
+    protocol = EtcdClient
+    scheme = 'https'
+    timeout = 0
+    followRedirect = False
+    afterFoundGet = False
+
+    def __init__(self, coordinator, configUrl):
+        self.coordinator = coordinator
+        self.configUrl = configUrl
+        self.host, self.port, self.key = self.parseConfigUrl(configUrl)
+        self.waitIndex = None
+        self.lastConfig = {}
+
+    def startObserving(self):
+        """Start (or re-start) watching the configuration file for changes."""
+        reactor.connectSSL(self.host, self.port, self,
+                           ssl.ClientContextFactory())
+
+    def parseConfigUrl(self, configUrl):
+        parsed = urlparse(configUrl)
+        return parsed.hostname, parsed.port or 2379, parsed.path
+
+    def getPath(self):
+        path = '/v2/keys/%s' % self.key.lstrip('/')
+        params = {'recursive': 'true'}
+        if self.waitIndex is not None:
+            params['waitIndex'] = self.waitIndex
+            params['wait'] = 'true'
+        path = '%s?%s' % (path, urllib.urlencode(params))
+        return path
+
+    def clientConnectionFailed(self, connector, reason):
+        connector.connect()
+
+    def clientConnectionLost(self, connector, unused_reason):
+        connector.connect()
+
+    def getMaxModifiedIndex(self, root):
+        root = root.get('node', root)
+        index = root['modifiedIndex']
+        for node in root.get('nodes', ()):
+            index = max(index, self.getMaxModifiedIndex(node))
+        return index
+
+    def onUpdate(self, update, etcdIdx):
+        if self.waitIndex is not None:
+            # This is already the result yielded by a watch operation
+            self.waitIndex = self.getMaxModifiedIndex(update) + 1
+        else:
+            self.waitIndex = etcdIdx + 1
+        config = copy.deepcopy(self.lastConfig)
+        config.update(decode_etcd_data(update))
+        if config != self.lastConfig:
+            self.coordinator.onConfigUpdate(copy.deepcopy(config))
+            self.lastConfig = config
+
+    def onFailure(self, reason):
+        log.error('failed: %s' % reason)
diff --git a/pybal/pybal.py b/pybal/pybal.py
index c10aeb7..9c5f83d 100755
--- a/pybal/pybal.py
+++ b/pybal/pybal.py
@@ -11,7 +11,7 @@
 
 import os, sys, signal, socket, random
 import logging
-from pybal import ipvs, util, config, instrumentation
+from pybal import ipvs, util, config, etcd, instrumentation
 
 from twisted.python import failure
 from twisted.internet import reactor, defer
diff --git a/pybal/version.py b/pybal/version.py
new file mode 100644
index 0000000..0e36f15
--- /dev/null
+++ b/pybal/version.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+"""
+  PyBal version data
+  ~~~~~~~~~~~~~~~~~~
+
+"""
+from __future__ import absolute_import
+
+
+__all__ = ('__version__', 'USER_AGENT_STRING')
+
+__version__ = '1.6'
+
+USER_AGENT_STRING = 'PyBal/%s' % __version__
diff --git a/requirements.txt b/requirements.txt
index 942371a..3eb29f0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1 @@
 twisted
-mock
diff --git a/setup.py b/setup.py
index 0f6813b..fa534da 100644
--- a/setup.py
+++ b/setup.py
@@ -42,6 +42,9 @@
     ),
     requires=(
         'twisted',
+        'PyOpenSSL'
+    ),
+    tests_require=(
         'mock',
     ),
     test_suite='pybal.test',
diff --git a/tox.ini b/tox.ini
index 6d034ee..095c4e4 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,6 +6,7 @@
 deps =
   twisted
   mock
+  PyOpenSSL
 
 [testenv:cover]
 commands =

-- 
To view, visit https://gerrit.wikimedia.org/r/225649
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ia6d898df142ebd17384ff50beadf63e0a2bbcfca
Gerrit-PatchSet: 13
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to