Ioana Lasc has proposed merging ~ilasc/turnip:add-statsd-metrics into turnip:master.
Commit message: Send maxrss, stime and utime metrics to statsd Requested reviews: Launchpad code reviewers (launchpad-reviewers) For more details, see: https://code.launchpad.net/~ilasc/turnip/+git/turnip/+merge/391552 This MP is based on Colin's suggestion in MP 391365 and his patch for inter-process communication. Added the "send to statsd/Telegraf" code, the formatting and tags for metrics, the singleton for the Statsd client and Mock for unit tests. Tested locally the StatsdGitClient connection with Telegraf & Chronograf, but this branch still needs work and unit tests. This is a "please review direction" MP, thanks Colin! -- Your team Launchpad code reviewers is requested to review the proposed merge of ~ilasc/turnip:add-statsd-metrics into turnip:master.
diff --git a/config.yaml b/config.yaml index 99b0e7b..f7d8612 100644 --- a/config.yaml +++ b/config.yaml @@ -21,3 +21,6 @@ openid_provider_root: https://testopenid.test/ site_name: git.launchpad.test main_site_root: https://launchpad.test/ celery_broker: pyamqp://guest@localhost// +statsd_host: launchpad.test +statsd_port: 8125 +statsd_prefix: lp.turnip diff --git a/packbackendserver.tac b/packbackendserver.tac index 191ac16..e8157cc 100644 --- a/packbackendserver.tac +++ b/packbackendserver.tac @@ -20,7 +20,10 @@ from twisted.scripts.twistd import ServerOptions from turnip.config import config from turnip.log import RotatableFileLogObserver -from turnip.pack.git import PackBackendFactory +from turnip.pack.git import ( + PackBackendFactory, + StatsdGitClient, + ) from turnip.pack.hookrpc import ( HookRPCHandler, HookRPCServerFactory, @@ -37,11 +40,14 @@ def getPackBackendServices(): hookrpc_path = config.get('hookrpc_path') or repo_store hookrpc_sock_path = os.path.join( hookrpc_path, 'hookrpc_sock_%d' % pack_backend_port) + statsd_client = StatsdGitClient(config.get('statsd_host'), config.get('statsd_port') + config.get('statsd_prefix')) pack_backend_service = internet.TCPServer( pack_backend_port, PackBackendFactory(repo_store, hookrpc_handler, - hookrpc_sock_path)) + hookrpc_sock_path, + statsd_client)) if os.path.exists(hookrpc_sock_path): os.unlink(hookrpc_sock_path) hookrpc_service = internet.UNIXServer( diff --git a/requirements.txt b/requirements.txt index fe3f1ac..ac502e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -58,6 +58,7 @@ scandir==1.10.0 setuptools-scm==1.17.0 simplejson==3.6.5 six==1.15.0 +statsd==3.3.0 testscenarios==0.5.0 testtools==2.4.0 traceback2==1.4.0 diff --git a/setup.py b/setup.py index 5cfae76..6bc2e94 100755 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ requires = [ 'pygit2>=0.27.4,<0.28.0', 'python-openid2', 'PyYAML', + 'statsd', 'Twisted[conch]', 'waitress', 'zope.interface', diff --git a/turnip/pack/git.py b/turnip/pack/git.py index 4407085..8998462 100644 --- a/turnip/pack/git.py +++ b/turnip/pack/git.py @@ -11,9 +11,13 @@ from __future__ import ( __metaclass__ = type +import json +import os.path import uuid - import six +import socket +import statsd +import threading from twisted.internet import ( defer, error, @@ -47,6 +51,32 @@ VIRT_ERROR_PREFIX = b'turnip virt error: ' SAFE_PARAMS = frozenset([b'host', b'version']) +class ThreadSafeSingleton(type): + _instances = {} + _singleton_lock = threading.Lock() + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + with cls._singleton_lock: + if cls not in cls._instances: + cls._instances[cls] = super( + ThreadSafeSingleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class StatsdGitClient(): + __metaclass__ = ThreadSafeSingleton + + def __init__(self, *args, **kwargs): + self.host = args[0] + self.port = args[1] + self.prefix = args[2] + self.client = statsd.StatsClient(self.host, self.port, self.prefix) + + def get_client(self): + return self.client + + class RequestIDLogger(Logger): def emit(self, level, format=None, **kwargs): @@ -235,13 +265,15 @@ class PackServerProtocol(PackProxyProtocol): return auth_params -class GitProcessProtocol(protocol.ProcessProtocol): +class GitProcessProtocol(protocol.ProcessProtocol, object): _err_buffer = b'' + _resource_usage_buffer = b'' def __init__(self, peer): self.peer = peer self.out_started = False + self.client = self.peer.factory.statsd_client.get_client() def connectionMade(self): self.peer.setPeer(self) @@ -250,6 +282,12 @@ class GitProcessProtocol(protocol.ProcessProtocol): UnstoppableProducerWrapper(self.peer.transport), True) self.peer.resumeProducing() + def childDataReceived(self, childFD, data): + if childFD == 3: + self.resourceUsageReceived(data) + else: + super(GitProcessProtocol, self).childDataReceived(childFD, data) + def outReceived(self, data): self.out_started = True self.peer.sendRawData(data) @@ -259,6 +297,10 @@ class GitProcessProtocol(protocol.ProcessProtocol): # process is done. self._err_buffer += data + def resourceUsageReceived(self, data): + # Just store it up so we can deal with it when the process is done. + self._resource_usage_buffer += data + def outConnectionLost(self): if self._err_buffer: # Originally we'd always return stderr as an ERR packet for @@ -299,6 +341,35 @@ class GitProcessProtocol(protocol.ProcessProtocol): code=code) self.peer.sendPacket(ERROR_PREFIX + 'backend exited %d' % code) self.peer.processEnded(reason) + if self._resource_usage_buffer: + try: + resource_usage = json.loads( + self._resource_usage_buffer.decode('UTF-8')) + + gauge_name = ("host={},repo={},operation={},metric=maxrss" + .format( + socket.gethostname(), + self.peer.raw_pathname, + self.peer.command)) + + self.client.gauge(gauge_name, resource_usage['maxrss']) + + gauge_name = ("host={},repo={},operation={},metric=stime" + .format( + socket.gethostname(), + self.peer.raw_pathname, + self.peer.command)) + self.client.gauge(gauge_name, resource_usage['stime']) + + gauge_name = ("host={},repo={},operation={},metric=utime" + .format( + socket.gethostname(), + self.peer.raw_pathname, + self.peer.command)) + self.client.gauge(gauge_name, resource_usage['utime']) + + except ValueError: + pass def pauseProducing(self): self.transport.pauseProducing() @@ -485,8 +556,9 @@ class PackBackendProtocol(PackServerProtocol): def spawnGit(self, subcmd, extra_args, write_operation=False, send_path_as_option=False, auth_params=None, cmd_env=None): - cmd = b'git' - args = [b'git'] + cmd = os.path.join( + os.path.dirname(__file__), 'git_helper.py').encode('UTF-8') + args = [cmd] if send_path_as_option: args.extend([b'-C', self.path]) args.append(subcmd) @@ -507,10 +579,12 @@ class PackBackendProtocol(PackServerProtocol): self.log.info('Spawning {args}', args=args) self.peer = GitProcessProtocol(self) - self.spawnProcess(cmd, args, env=env) + self.spawnProcess( + cmd, args, env=env, childFDs={0: "w", 1: "r", 2: "r", 3: "r"}) - def spawnProcess(self, cmd, args, env=None): - default_reactor.spawnProcess(self.peer, cmd, args, env=env) + def spawnProcess(self, cmd, args, env=None, childFDs=None): + default_reactor.spawnProcess( + self.peer, cmd, args, env=env, childFDs=childFDs) def expectNextCommand(self): """Enables this connection to receive the next command.""" @@ -620,10 +694,12 @@ class PackBackendFactory(protocol.Factory): def __init__(self, root, hookrpc_handler=None, - hookrpc_sock=None): + hookrpc_sock=None, + statsd_client=None): self.root = root self.hookrpc_handler = hookrpc_handler self.hookrpc_sock = hookrpc_sock + self.statsd_client = statsd_client class PackVirtServerProtocol(PackProxyServerProtocol): diff --git a/turnip/pack/git_helper.py b/turnip/pack/git_helper.py new file mode 100755 index 0000000..0142834 --- /dev/null +++ b/turnip/pack/git_helper.py @@ -0,0 +1,36 @@ +#!/usr/bin/python + +# Copyright 2020 Canonical Ltd. This software is licensed under the +# GNU Affero General Public License version 3 (see the file LICENSE). + +from __future__ import absolute_import, print_function, unicode_literals + +import fcntl +import json +import os +import resource +import subprocess +import sys + + +if __name__ == '__main__': + # We expect the caller to have opened FD 3, and will send information + # about git's resource usage there. Mark it close-on-exec so that the + # git child process can't accidentally interfere with it. + flags = fcntl.fcntl(3, fcntl.F_GETFD) + fcntl.fcntl(3, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) + + # Call git and wait for it to finish. + ret = subprocess.call(['git'] + sys.argv[1:]) + + # Dump resource usage information to FD 3. + resource_fd = os.fdopen(3, 'w') + rusage = resource.getrusage(resource.RUSAGE_CHILDREN) + resource_fd.write(json.dumps({ + "utime": rusage.ru_utime, + "stime": rusage.ru_stime, + "maxrss": rusage.ru_maxrss, + })) + + # Pass on git's exit status. + sys.exit(ret) diff --git a/turnip/pack/tests/test_functional.py b/turnip/pack/tests/test_functional.py index c06c4e2..5ea77d3 100644 --- a/turnip/pack/tests/test_functional.py +++ b/turnip/pack/tests/test_functional.py @@ -39,7 +39,9 @@ import six from testscenarios.testcase import WithScenarios from testtools import TestCase from testtools.content import text_content -from testtools.deferredruntest import AsynchronousDeferredRunTest +from testtools.deferredruntest import ( + AsynchronousDeferredRunTestForBrokenTwisted, + ) from testtools.matchers import ( Equals, Is, @@ -76,12 +78,14 @@ from turnip.pack.tests.fake_servers import ( FakeAuthServerService, FakeVirtInfoService, ) +from turnip.pack.tests.test_helpers import MockStatsd from turnip.version_info import version_info class FunctionalTestMixin(WithScenarios): - run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=30) + run_tests_with = AsynchronousDeferredRunTestForBrokenTwisted.make_factory( + timeout=30) scenarios = [ ('v0 protocol', {"protocol_version": b"0"}), @@ -119,10 +123,12 @@ class FunctionalTestMixin(WithScenarios): # get confused on Python 2. self.root = tempfile.mkdtemp(prefix=b'turnip-test-root-') self.addCleanup(shutil.rmtree, self.root, ignore_errors=True) + self.statsd_client = MockStatsd() self.backend_listener = reactor.listenTCP( 0, PackBackendFactory( - self.root, self.hookrpc_handler, self.hookrpc_sock_path)) + self.root, self.hookrpc_handler, self.hookrpc_sock_path, + self.statsd_client)) self.backend_port = self.backend_listener.getHost().port self.addCleanup(self.backend_listener.stopListening) @@ -613,6 +619,7 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin): 0, server.Site(self.authserver)) self.authserver_port = self.authserver_listener.getHost().port self.authserver_url = b'http://localhost:%d/' % self.authserver_port + self.addCleanup(self.authserver_listener.stopListening) # Run a backend server in a repo root containing an empty repo # for the path '/test'. @@ -629,16 +636,11 @@ class FrontendFunctionalTestMixin(FunctionalTestMixin): PackVirtFactory( b'localhost', self.backend_port, self.virtinfo_url, 15)) self.virt_port = self.virt_listener.getHost().port + self.addCleanup(self.virt_listener.stopListening) self.virtinfo.ref_permissions = { b'refs/heads/master': ['create', 'push']} @defer.inlineCallbacks - def tearDown(self): - super(FrontendFunctionalTestMixin, self).tearDown() - yield self.virt_listener.stopListening() - yield self.authserver_listener.stopListening() - - @defer.inlineCallbacks def test_read_only(self): self.virtinfo.ref_permissions = { b'refs/heads/master': ['create', 'push']} @@ -711,16 +713,12 @@ class TestGitFrontendFunctional(FrontendFunctionalTestMixin, TestCase): self.frontend_listener = reactor.listenTCP( 0, PackFrontendFactory(b'localhost', self.virt_port)) self.port = self.frontend_listener.getHost().port + self.addCleanup(self.frontend_listener.stopListening) # Always use a writable URL for now. self.url = b'git://localhost:%d/+rw/test' % self.port self.ro_url = b'git://localhost:%d/test' % self.port - @defer.inlineCallbacks - def tearDown(self): - yield super(TestGitFrontendFunctional, self).tearDown() - yield self.frontend_listener.stopListening() - class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase): @@ -744,17 +742,13 @@ class TestSmartHTTPFrontendFunctional(FrontendFunctionalTestMixin, TestCase): })) self.frontend_listener = reactor.listenTCP(0, frontend_site) self.port = self.frontend_listener.getHost().port + self.addCleanup(self.frontend_listener.stopListening) # Always use a writable URL for now. self.url = b'http://localhost:%d/+rw/test' % self.port self.ro_url = b'http://localhost:%d/test' % self.port @defer.inlineCallbacks - def tearDown(self): - yield super(TestSmartHTTPFrontendFunctional, self).tearDown() - yield self.frontend_listener.stopListening() - - @defer.inlineCallbacks def test_root_revision_header(self): response = yield client.Agent(reactor).request( b'HEAD', b'http://localhost:%d/' % self.port) diff --git a/turnip/pack/tests/test_git.py b/turnip/pack/tests/test_git.py index d1a8162..599c5f6 100644 --- a/turnip/pack/tests/test_git.py +++ b/turnip/pack/tests/test_git.py @@ -35,6 +35,7 @@ from turnip.pack import ( helpers, ) from turnip.pack.tests.fake_servers import FakeVirtInfoService +from turnip.pack.tests.test_helpers import MockStatsd from turnip.pack.tests.test_hooks import MockHookRPCHandler from turnip.tests.compat import mock @@ -116,7 +117,7 @@ class DummyPackBackendProtocol(git.PackBackendProtocol): test_process = None - def spawnProcess(self, cmd, args, env=None): + def spawnProcess(self, cmd, args, env=None, childFDs=None): if self.test_process is not None: raise AssertionError('Process already spawned.') self.test_process = (cmd, args, env) @@ -178,9 +179,11 @@ class TestPackBackendProtocol(TestCase): super(TestPackBackendProtocol, self).setUp() self.root = self.useFixture(TempDir()).path self.hookrpc_handler = MockHookRPCHandler() + self.statsd_client = MockStatsd() self.hookrpc_sock = os.path.join(self.root, 'hookrpc_sock') self.factory = git.PackBackendFactory( - self.root, self.hookrpc_handler, self.hookrpc_sock) + self.root, self.hookrpc_handler, + self.hookrpc_sock, self.statsd_client) self.proto = DummyPackBackendProtocol() self.proto.factory = self.factory self.transport = testing.StringTransportWithDisconnection() @@ -196,6 +199,9 @@ class TestPackBackendProtocol(TestCase): self.addCleanup(self.virtinfo_listener.stopListening) self.setupConfig() + self.git_helper = os.path.join( + os.path.dirname(git.__file__), 'git_helper.py').encode('UTF-8') + def setupConfig(self): config.defaults['virtinfo_endpoint'] = self.virtinfo_url # Force timeout to be a string to make sure we are casting it @@ -229,7 +235,7 @@ class TestPackBackendProtocol(TestCase): [('foo.git', )], self.virtinfo.confirm_repo_creation_call_args) self.assertEqual( - (b'git', [b'git', b'upload-pack', full_path], { + (self.git_helper, [self.git_helper, b'upload-pack', full_path], { 'GIT_PROTOCOL': 'version=0' }), self.proto.test_process) @@ -266,8 +272,8 @@ class TestPackBackendProtocol(TestCase): b'git-upload-pack', b'/foo.git', {b'host': b'example.com'}) full_path = os.path.join(six.ensure_binary(self.root), b'foo.git') self.assertEqual( - (b'git', - [b'git', b'upload-pack', full_path], + (self.git_helper, + [self.git_helper, b'upload-pack', full_path], {'GIT_PROTOCOL': 'version=0'}), self.proto.test_process) @@ -280,8 +286,11 @@ class TestPackBackendProtocol(TestCase): b'git-receive-pack', b'/foo.git', {b'host': b'example.com'}) self.assertThat( self.proto.test_process, MatchesListwise([ - Equals(b'git'), - Equals([b'git', b'receive-pack', repo_dir.encode('utf-8')]), + Equals(self.git_helper), + Equals([ + self.git_helper, b'receive-pack', + repo_dir.encode('utf-8'), + ]), ContainsDict( {b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})])) @@ -296,10 +305,10 @@ class TestPackBackendProtocol(TestCase): self.proto.packetReceived(b'HEAD refs/heads/master') self.assertThat( self.proto.test_process, MatchesListwise([ - Equals(b'git'), + Equals(self.git_helper), Equals([ - b'git', b'-C', repo_dir.encode('utf-8'), b'symbolic-ref', - b'HEAD', b'refs/heads/master']), + self.git_helper, b'-C', repo_dir.encode('utf-8'), + b'symbolic-ref', b'HEAD', b'refs/heads/master']), ContainsDict( {b'TURNIP_HOOK_RPC_SOCK': Equals(self.hookrpc_sock)})])) diff --git a/turnip/pack/tests/test_helpers.py b/turnip/pack/tests/test_helpers.py index ff484e8..4ebfa58 100644 --- a/turnip/pack/tests/test_helpers.py +++ b/turnip/pack/tests/test_helpers.py @@ -32,6 +32,8 @@ from turnip.pack.helpers import ( ) import turnip.pack.hooks from turnip.version_info import version_info +from zope.interface import implementer +from zope.interface import Interface TEST_DATA = b'0123456789abcdef' TEST_PKT = b'00140123456789abcdef' @@ -339,3 +341,73 @@ class TestCapabilityAdvertisement(TestCase): self.assertEqual( turnip_capabilities, git_advertised_capabilities.replace(git_agent, turnip_agent)) + + +class IStats(Interface): + def incr(self, key=None): + """ + increment a key + + :param key: the key to increment + :return: nothing + """ + + def decr(self, key=None): + """ + decrement a key + + :param key: the key to decrement + :return: nothing + """ + + def timing(self, key=None, ms=None): + """ + record an execution time for this key + + :param key: the key to report for + :param ms: the timing in milliseconds + :return: nothing + """ + + def gauge(self, key=None, value=None): + """ + gauge a value + + :param key: the key to gauge for + :param value: the gauged value + :return: nothing + """ + + +@implementer(IStats) +class MockStatsd(): + def __init__(self): + self.vals = dict() + self.timings = dict() + + def get_instance(self): + return self + + def incr(self, key=None): + if key not in self.vals: + self.vals[key] = 1 + else: + self.vals[key] += 1 + + def decr(self, key=None): + if key not in self.vals: + self.vals[key] = -1 + else: + self.vals[key] -= 1 + + def timing(self, key=None, ms=None): + self.timings[key] = ms + + def gauge(self, key=None, value=None): + self.vals[key] = value + + def set(self, key=None, value=None): + self.vals[key] = value + + def get_client(self): + return self diff --git a/turnipserver.py b/turnipserver.py index 8627392..0f50e3c 100644 --- a/turnipserver.py +++ b/turnipserver.py @@ -17,6 +17,7 @@ from turnip.pack.git import ( PackBackendFactory, PackFrontendFactory, PackVirtFactory, + StatsdGitClient, ) from turnip.pack.hookrpc import ( HookRPCHandler, @@ -37,6 +38,9 @@ REPO_STORE = config.get('repo_store') HOOKRPC_PATH = config.get('hookrpc_path') or REPO_STORE VIRTINFO_ENDPOINT = config.get('virtinfo_endpoint') VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout')) +STATSD_HOST = config.get('statsd_host') +STATSD_PORT = config.get('statsd_port') +STATSD_PREFIX = config.get('statsd_prefix') # turnipserver.py is preserved for convenience in development, services # in production are run in separate processes. @@ -48,11 +52,15 @@ VIRTINFO_TIMEOUT = int(config.get('virtinfo_timeout')) hookrpc_handler = HookRPCHandler(VIRTINFO_ENDPOINT, VIRTINFO_TIMEOUT) hookrpc_sock_path = os.path.join( HOOKRPC_PATH, 'hookrpc_sock_%d' % PACK_BACKEND_PORT) + +statsd_client = StatsdGitClient(STATSD_HOST, STATSD_PORT, STATSD_PREFIX) + reactor.listenTCP( PACK_BACKEND_PORT, PackBackendFactory(REPO_STORE, hookrpc_handler, - hookrpc_sock_path)) + hookrpc_sock_path, + statsd_client)) if os.path.exists(hookrpc_sock_path): os.unlink(hookrpc_sock_path) reactor.listenUNIX(hookrpc_sock_path, HookRPCServerFactory(hookrpc_handler))
_______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : [email protected] Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp

