jenkins-bot has submitted this change and it was merged.
Change subject: Add EtcdConfigurationObserver
......................................................................
Add EtcdConfigurationObserver
Add a simple etcd client implementation that can watch a key for changes, using
the etcd API's wait / waitIndex parameters to ensure consistency.
Change-Id: Ia6d898df142ebd17384ff50beadf63e0a2bbcfca
---
M pybal/__init__.py
A pybal/etcd.py
M pybal/pybal.py
A pybal/version.py
M requirements.txt
M setup.py
M tox.ini
7 files changed, 170 insertions(+), 4 deletions(-)
Approvals:
Giuseppe Lavagetto: Looks good to me, approved
jenkins-bot: Verified
diff --git a/pybal/__init__.py b/pybal/__init__.py
index e838db5..ade8bc3 100644
--- a/pybal/__init__.py
+++ b/pybal/__init__.py
@@ -6,9 +6,8 @@
"""
import test
-__version__ = '1.6'
+from .version import *
-USER_AGENT_STRING = 'PyBal/%s' % __version__
__all__ = ('ipvs', 'monitor', 'pybal', 'util', 'monitors', 'bgp',
'config', 'instrumentation', 'USER_AGENT_STRING')
diff --git a/pybal/etcd.py b/pybal/etcd.py
new file mode 100644
index 0000000..eee4896
--- /dev/null
+++ b/pybal/etcd.py
@@ -0,0 +1,150 @@
+# -*- coding: utf-8 -*-
+"""
+ PyBal etcd client
+ ~~~~~~~~~~~~~~~~~
+
+ This module allows PyBal to be configured via etcd.
+
+"""
+from __future__ import absolute_import
+
+import copy
+import json
+import urllib
+
+from twisted.internet import defer, reactor, ssl
+from twisted.python import failure
+from twisted.web import error
+from twisted.web.client import HTTPClientFactory
+from twisted.web.http import HTTPClient, urlparse
+
+from .config import ConfigurationObserver
+from .version import USER_AGENT_STRING
+from .util import log
+
+def decode_node(node):
+ """Decode an individual node from an etcd response."""
+ key = node['key'].rsplit('/', 1)[-1]
+ value = json.loads(node['value'])
+ pooled = value.pop('pooled', None)
+ if pooled is not None:
+ value['enabled'] = pooled == 'yes'
+ return key, value
+
+
+def decode_etcd_data(data):
+ """Simplify an etcd response by stripping leading path components
+ from key names, decoding JSON values, and removing etcd metadata."""
+ node = data['node']
+ if node.get('dir'):
+ return dict(decode_node(child) for child in node['nodes'])
+ else:
+ key, value = decode_node(node)
+ return {key: value}
+
+
+class EtcdClient(HTTPClient):
+ """Represents a client for the etcd HTTP API."""
+ etcdIndex = 0
+
+ def connectionMade(self):
+ self.sendCommand('GET', self.factory.getPath())
+ self.sendHeader('Host', self.factory.host)
+ self.sendHeader('User-Agent', self.factory.agent)
+ self.endHeaders()
+
+ def handleStatus(self, version, status, message):
+ self.version = version
+ self.status = status
+ self.message = message
+
+ def handleResponse(self, response):
+ if self.status != '200':
+ err = error.Error(self.status, self.message, response)
+ self.factory.onFailure(failure.Failure(err))
+ else:
+ try:
+ config = json.loads(response)
+ except Exception as e:
+ self.factory.onFailure(failure.Failure())
+ else:
+ self.factory.onUpdate(config, self.etcdIndex)
+ self.transport.loseConnection()
+
+ def handleHeader(self, key, val):
+ if key == 'X-Etcd-Index':
+ self.etcdIndex = int(val)
+
+ def timeout(self):
+ err = defer.TimeoutError(
+ 'Retrieving key %s from %s took longer than %s seconds.' %
+ (self.factory.key, self.factory.host, self.factory.timeout))
+ self.factory.onFailure(err)
+ self.transport.loseConnection()
+
+
+class EtcdConfigurationObserver(ConfigurationObserver, HTTPClientFactory):
+ """A factory that will continuously monitor an etcd key for changes."""
+
+ urlScheme = 'etcd://'
+
+ agent = USER_AGENT_STRING
+ method = 'GET'
+ protocol = EtcdClient
+ scheme = 'https'
+ timeout = 0
+ followRedirect = False
+ afterFoundGet = False
+
+ def __init__(self, coordinator, configUrl):
+ self.coordinator = coordinator
+ self.configUrl = configUrl
+ self.host, self.port, self.key = self.parseConfigUrl(configUrl)
+ self.waitIndex = None
+ self.lastConfig = {}
+
+ def startObserving(self):
+ """Start (or re-start) watching the configuration file for changes."""
+ reactor.connectSSL(self.host, self.port, self,
+ ssl.ClientContextFactory())
+
+ def parseConfigUrl(self, configUrl):
+ parsed = urlparse(configUrl)
+ return parsed.hostname, parsed.port or 2379, parsed.path
+
+ def getPath(self):
+ path = '/v2/keys/%s' % self.key.lstrip('/')
+ params = {'recursive': 'true'}
+ if self.waitIndex is not None:
+ params['waitIndex'] = self.waitIndex
+ params['wait'] = 'true'
+ path = '%s?%s' % (path, urllib.urlencode(params))
+ return path
+
+ def clientConnectionFailed(self, connector, reason):
+ connector.connect()
+
+ def clientConnectionLost(self, connector, unused_reason):
+ connector.connect()
+
+ def getMaxModifiedIndex(self, root):
+ root = root.get('node', root)
+ index = root['modifiedIndex']
+ for node in root.get('nodes', ()):
+ index = max(index, self.getMaxModifiedIndex(node))
+ return index
+
+ def onUpdate(self, update, etcdIdx):
+ if self.waitIndex is not None:
+ # This is already the result yielded by a watch operation
+ self.waitIndex = self.getMaxModifiedIndex(update) + 1
+ else:
+ self.waitIndex = etcdIdx + 1
+ config = copy.deepcopy(self.lastConfig)
+ config.update(decode_etcd_data(update))
+ if config != self.lastConfig:
+ self.coordinator.onConfigUpdate(copy.deepcopy(config))
+ self.lastConfig = config
+
+ def onFailure(self, reason):
+ log.error('failed: %s' % reason)
diff --git a/pybal/pybal.py b/pybal/pybal.py
index c10aeb7..9c5f83d 100755
--- a/pybal/pybal.py
+++ b/pybal/pybal.py
@@ -11,7 +11,7 @@
import os, sys, signal, socket, random
import logging
-from pybal import ipvs, util, config, instrumentation
+from pybal import ipvs, util, config, etcd, instrumentation
from twisted.python import failure
from twisted.internet import reactor, defer
diff --git a/pybal/version.py b/pybal/version.py
new file mode 100644
index 0000000..0e36f15
--- /dev/null
+++ b/pybal/version.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+"""
+ PyBal version data
+ ~~~~~~~~~~~~~~~~~~
+
+"""
+from __future__ import absolute_import
+
+
+__all__ = ('__version__', 'USER_AGENT_STRING')
+
+__version__ = '1.6'
+
+USER_AGENT_STRING = 'PyBal/%s' % __version__
diff --git a/requirements.txt b/requirements.txt
index 942371a..3eb29f0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1 @@
twisted
-mock
diff --git a/setup.py b/setup.py
index 0f6813b..fa534da 100644
--- a/setup.py
+++ b/setup.py
@@ -42,6 +42,9 @@
),
requires=(
'twisted',
+ 'PyOpenSSL'
+ ),
+ tests_require=(
'mock',
),
test_suite='pybal.test',
diff --git a/tox.ini b/tox.ini
index 6d034ee..095c4e4 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,6 +6,7 @@
deps =
twisted
mock
+ PyOpenSSL
[testenv:cover]
commands =
--
To view, visit https://gerrit.wikimedia.org/r/225649
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ia6d898df142ebd17384ff50beadf63e0a2bbcfca
Gerrit-PatchSet: 13
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>
Gerrit-Reviewer: Giuseppe Lavagetto <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits