Giuseppe Lavagetto has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/365549 )
Change subject: [WiP] Re-write etcd driver ...................................................................... [WiP] Re-write etcd driver * Use treq to make requests * Query the DNS for SRV records * Reschedule the configurationObserver for every failure What is missing: - Initial url parsing - tests Change-Id: I6c48e622cddf783a2594e9d2c36f83e773773b35 --- M pybal/etcd.py 1 file changed, 116 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal refs/changes/49/365549/1 diff --git a/pybal/etcd.py b/pybal/etcd.py index 9067faa..929329e 100644 --- a/pybal/etcd.py +++ b/pybal/etcd.py @@ -10,9 +10,13 @@ import copy import json +import random import urllib +import treq + from twisted.internet import defer, reactor, ssl +from twisted.names import client as dnsclient, error as dnserror from twisted.python import failure from twisted.web import error from twisted.web.client import HTTPClientFactory @@ -97,6 +101,118 @@ self.transport.loseConnection() +class EtcdConfigurationObserverNew(ConfigurationObserver): + """ + A new version of the etcd-based configurationobserver. + + Resolves the etcd host to use from a SRV record, and uses + treq to fetch data/watch etcd. + """ + urlScheme = 'etcd2://' + scheme = 'https' + + def __init__(self, coordinator, configUrl): + self.coordinator = coordinator + # TODO: I need the path as well + self.srvRecord = configUrl.replace('etcd2://', '') + self.waitIndex = None + self.etcdHost = None + self.lastConfig = {} + self.resolver = dnsclient.Resolver('/etc/resolv.conf') + + def dnsFailure(self, reason): + catchList = (dnserror.DomainError, dnserror.AuthoritativeDomainError, + dnserror.DNSFormatError, dnserror.DNSNameError, + dnserror.DNSQueryRefusedError, + dnserror.DNSQueryTimeoutError, dnserror.DNSServerError, + dnserror.DNSUnknownError) + r = reason.trap(*catchList) + if r is not None: + log.error('Could not resolve SRV record for %s: %s' % + (self.srvRecord, reason), system="config-etcd") + + # Re-try in 5 seconds. TODO: raise an alert + reactor.callLater(5, self.startObserving) + + def getHost(self): + """Get host to connect to.""" + if self.etcdHost is not None: + return defer.Deferred() + d = self.resolver.lookupService(self.srvRecord) + d.addCallBack(self._pickHost) + d.addErrback(self.dnsFailure) + return d + + def _pickHost(self, records): + answers = records[0] + if not answers: + log.error('Could not find any SRV record for %s', self.srvRecord) + # TODO: raise an alert, stop announcing BGP? + else: + hosts = [str(x.payload) for x in answers] + self.etcdHost = random.choice(hosts) + + def startObserving(self): + """Main loop watching etcd for changes""" + d = self.getHost() + d.addCallback(self._fetch) + + def _fetch(self): + params = {'wait': True} + if self.waitIndex is not None: + params['waitIndex'] = self.waitIndex + 1 + d = treq.get(self._url, params=params) + d.addCallback(self.onUpdate, d) + d.addErrback(self.onFailure) + + def onUpdate(self, response, d): + """Actions take when an update is received.""" + self._updateWaitIndex(response) + + config = copy.deepcopy(self.lastConfig) + + # Read new data + new_data = decode_etcd_data(response.json()) + config.update(new_data) + + # Remove deleted/inactive nodes + to_remove = [k for k, v in new_data.items() if v is None] + for k in to_remove: + if k in config: + del config[k] + + # Now update pybal config + if config != self.lastConfig: + self.coordinator.onConfigUpdate(copy.deepcopy(config)) + self.lastConfig = config + # After the config is updated, fetch data again. + d.addCallback(self._fetch) + + def onFailure(self, reason): + log.error('failed: %s' % reason, system="config-etcd") + # We failed, so for a good measure we reset everything + self.waitIndex = None + self.etcdHost = None + # Then we start fresh + reactor.callLater(5, self.startObserving) + + def _updateWaitIndex(self, response): + if self.waitIndex is None: + if 'X-Etcd-Index' in response.headers: + self.waitIndex = int(response.headers['X-Etcd-Index']) + 1 + else: + root = response.json() + idx = self._getMaxModifiedIndex(root) + self.waitIndex = idx + 1 + + def _getMaxModifiedIndex(self, root): + root = root.get('node', root) + index = root['modifiedIndex'] + for node in root.get('nodes', ()): + index = max(index, self.getMaxModifiedIndex(node)) + return index + + class EtcdConfigurationObserver(ConfigurationObserver, HTTPClientFactory): """A factory that will continuously monitor an etcd key for changes.""" -- To view, visit https://gerrit.wikimedia.org/r/365549 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I6c48e622cddf783a2594e9d2c36f83e773773b35 Gerrit-PatchSet: 1 Gerrit-Project: operations/debs/pybal Gerrit-Branch: 2.0-dev Gerrit-Owner: Giuseppe Lavagetto <glavage...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits