Giuseppe Lavagetto has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/365549 )

Change subject: [WiP] Re-write etcd driver
......................................................................

[WiP] Re-write etcd driver

* Use treq to make requests
* Query the DNS for SRV records
* Reschedule the configurationObserver for every failure

What is missing:
- Initial url parsing
- tests

Change-Id: I6c48e622cddf783a2594e9d2c36f83e773773b35
---
M pybal/etcd.py
1 file changed, 116 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal 
refs/changes/49/365549/1

diff --git a/pybal/etcd.py b/pybal/etcd.py
index 9067faa..929329e 100644
--- a/pybal/etcd.py
+++ b/pybal/etcd.py
@@ -10,9 +10,13 @@
 
 import copy
 import json
+import random
 import urllib
 
+import treq
+
 from twisted.internet import defer, reactor, ssl
+from twisted.names import client as dnsclient, error as dnserror
 from twisted.python import failure
 from twisted.web import error
 from twisted.web.client import HTTPClientFactory
@@ -97,6 +101,118 @@
         self.transport.loseConnection()
 
 
+class EtcdConfigurationObserverNew(ConfigurationObserver):
+    """
+    A new version of the etcd-based configurationobserver.
+
+    Resolves the etcd host to use from a SRV record, and uses
+    treq to fetch data/watch etcd.
+    """
+    urlScheme = 'etcd2://'
+    scheme = 'https'
+
+    def __init__(self, coordinator, configUrl):
+        self.coordinator = coordinator
+        # TODO: I need the path as well
+        self.srvRecord = configUrl.replace('etcd2://', '')
+        self.waitIndex = None
+        self.etcdHost = None
+        self.lastConfig = {}
+        self.resolver = dnsclient.Resolver('/etc/resolv.conf')
+
+    def dnsFailure(self, reason):
+        catchList = (dnserror.DomainError, dnserror.AuthoritativeDomainError,
+                     dnserror.DNSFormatError, dnserror.DNSNameError,
+                     dnserror.DNSQueryRefusedError,
+                     dnserror.DNSQueryTimeoutError, dnserror.DNSServerError,
+                     dnserror.DNSUnknownError)
+        r = reason.trap(*catchList)
+        if r is not None:
+            log.error('Could not resolve SRV record for %s: %s' %
+                      (self.srvRecord, reason), system="config-etcd")
+
+        # Re-try in 5 seconds. TODO: raise an alert
+        reactor.callLater(5, self.startObserving)
+
+    def getHost(self):
+        """Get host to connect to."""
+        if self.etcdHost is not None:
+            return defer.Deferred()
+        d = self.resolver.lookupService(self.srvRecord)
+        d.addCallBack(self._pickHost)
+        d.addErrback(self.dnsFailure)
+        return d
+
+    def _pickHost(self, records):
+        answers = records[0]
+        if not answers:
+            log.error('Could not find any SRV record for %s', self.srvRecord)
+            # TODO: raise an alert, stop announcing BGP?
+        else:
+            hosts = [str(x.payload) for x in answers]
+            self.etcdHost = random.choice(hosts)
+
+    def startObserving(self):
+        """Main loop watching etcd for changes"""
+        d = self.getHost()
+        d.addCallback(self._fetch)
+
+    def _fetch(self):
+        params = {'wait': True}
+        if self.waitIndex is not None:
+            params['waitIndex'] = self.waitIndex + 1
+        d = treq.get(self._url, params=params)
+        d.addCallback(self.onUpdate, d)
+        d.addErrback(self.onFailure)
+
+    def onUpdate(self, response, d):
+        """Actions take when an update is received."""
+        self._updateWaitIndex(response)
+
+        config = copy.deepcopy(self.lastConfig)
+
+        # Read new data
+        new_data = decode_etcd_data(response.json())
+        config.update(new_data)
+
+        # Remove deleted/inactive nodes
+        to_remove = [k for k, v in new_data.items() if v is None]
+        for k in to_remove:
+            if k in config:
+                del config[k]
+
+        # Now update pybal config
+        if config != self.lastConfig:
+            self.coordinator.onConfigUpdate(copy.deepcopy(config))
+            self.lastConfig = config
+        # After the config is updated, fetch data again.
+        d.addCallback(self._fetch)
+
+    def onFailure(self, reason):
+        log.error('failed: %s' % reason, system="config-etcd")
+        # We failed, so for a good measure we reset everything
+        self.waitIndex = None
+        self.etcdHost = None
+        # Then we start fresh
+        reactor.callLater(5, self.startObserving)
+
+    def _updateWaitIndex(self, response):
+        if self.waitIndex is None:
+            if 'X-Etcd-Index' in response.headers:
+                self.waitIndex = int(response.headers['X-Etcd-Index']) + 1
+        else:
+            root = response.json()
+            idx = self._getMaxModifiedIndex(root)
+            self.waitIndex = idx + 1
+
+    def _getMaxModifiedIndex(self, root):
+        root = root.get('node', root)
+        index = root['modifiedIndex']
+        for node in root.get('nodes', ()):
+            index = max(index, self.getMaxModifiedIndex(node))
+        return index
+
+
 class EtcdConfigurationObserver(ConfigurationObserver, HTTPClientFactory):
     """A factory that will continuously monitor an etcd key for changes."""
 

-- 
To view, visit https://gerrit.wikimedia.org/r/365549
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6c48e622cddf783a2594e9d2c36f83e773773b35
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: 2.0-dev
Gerrit-Owner: Giuseppe Lavagetto <glavage...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to