Ori.livneh has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/74796


Change subject: Make Ganglia metric module configuration-aware
......................................................................

Make Ganglia metric module configuration-aware

The cost of implementing the metric module as a generic ZeroMQ publisher
monitor is that the streams have to be enumerated in the metric's pyconf file,
and Puppet sucks at assembling a file on the basis of multiple resources.
This patch refactors the metric module to discover streams via EventLogging
instance configuration files in <eventlogging.d/>.

Change-Id: I93504edbee39ca0640fc01072d1abc686521cd4d
---
D ganglia/README.rst
A ganglia/conf.d/eventlogging_mon.pyconf
D ganglia/conf.d/zpubmon.pyconf
A ganglia/python_modules/eventlogging_mon.py
D ganglia/python_modules/zpubmon.py
5 files changed, 158 insertions(+), 200 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/96/74796/1

diff --git a/ganglia/README.rst b/ganglia/README.rst
deleted file mode 100644
index ea772b0..0000000
--- a/ganglia/README.rst
+++ /dev/null
@@ -1,27 +0,0 @@
-ZeroMQ PUB Monitor for Ganglia
-==============================
-
-This is a gmond metric-gathering module which reports a cumulative count
-of messages published by ZeroMQ publishers.
-
-Endpoints are specified as configuration parameters in zpubmon.pyconf.
-Each configuration key name is used as an endpoint name and its
-corresponding value as the endpoint's URI. The configuration param
-`groups` is special-cased: if present, its value specifies the group
-name for the metrics generated by the module. Otherwise the default
-group name ('ZeroMQ') is used.
-
-To test, invoke with one or more endpoint (name: URI) pairs specifying
-ZMQ publishers to poll. For example::
-
-  $ python zpubmon.py system-events tcp://localhost:8006
-
-Message counts will be logged to the console every five seconds.
-
-For more information about configuring Python modules for gmond, see the
-`official documentation <http://sourceforge.net/apps/trac/ganglia/wiki
-/ganglia_gmond_python_modules>`_.
-
-Copyright (c) 2012 by Ori Livneh <[email protected]>
-
-Licensed under the GNU General Public Licence, version 2.0 or later.
diff --git a/ganglia/conf.d/eventlogging_mon.pyconf 
b/ganglia/conf.d/eventlogging_mon.pyconf
new file mode 100644
index 0000000..8b80a16
--- /dev/null
+++ b/ganglia/conf.d/eventlogging_mon.pyconf
@@ -0,0 +1,18 @@
+# Ganglia metric module for EventLogging ZeroMQ publishers.
+# Counts messages per second. This file is managed by Puppet.
+modules {
+    module {
+        name = "eventlogging_mon"
+        language = "python"
+    }
+}
+
+collection_group {
+    collect_every = 10
+    time_threshold = 60
+    metric {
+        name_match = "zeromq_(.+)"
+        title = "\\1"
+        value_threshold = 0
+    }
+}
diff --git a/ganglia/conf.d/zpubmon.pyconf b/ganglia/conf.d/zpubmon.pyconf
deleted file mode 100644
index 98bcad6..0000000
--- a/ganglia/conf.d/zpubmon.pyconf
+++ /dev/null
@@ -1,37 +0,0 @@
-# Sample configuration for zpubmon Ganglia module
-
-modules {
-  module {
-    name = "zpubmon"
-    language = "python"
-    param groups {
-        value = "EventLogging"
-    }
-    param server-generated-raw {
-        value = "tcp://127.0.0.1:8421"
-    }
-    param client-generated-raw {
-        value = "tcp://127.0.0.1:8422"
-    }
-    param client-generated-valid {
-        value = "tcp://127.0.0.1:8484"
-    }
-  }
-}
-
-collection_group {
-  collect_every = 10
-  time_threshold = 60
-  metric {
-      name = "server-generated-raw"
-      title = "Raw server-generated events"
-  }
-  metric {
-      name = "client-generated-raw"
-      title = "Raw client-generated events"
-  }
-  metric {
-      name = "client-generated-valid"
-      title = "Valid client-generated events"
-  }
-}
diff --git a/ganglia/python_modules/eventlogging_mon.py 
b/ganglia/python_modules/eventlogging_mon.py
new file mode 100644
index 0000000..2ac4518
--- /dev/null
+++ b/ganglia/python_modules/eventlogging_mon.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+  EventLogging Ganglia module
+  ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+  This is a gmond metric-gathering module which reports a cumulative
+  count of messages published by EventLogging ZeroMQ publishers.
+
+  :copyright: (c) 2013 by Ori Livneh <[email protected]>
+  :license: GNU General Public Licence 2.0 or later
+
+"""
+import errno
+import fileinput
+import logging
+import os
+import re
+import sys
+import threading
+import time
+
+import zmq
+
+
+logging.basicConfig(format='[ZMQ] %(asctime)s %(message)s', level=logging.INFO)
+
+defaults = {
+    'value_type': 'uint',
+    'format': '%d',
+    'units': 'events',
+    'slope': 'positive',
+    'time_max': 20,
+    'description': 'messages published',
+    'groups': 'EventLogging',
+}
+
+
+def iter_files(dir):
+    """Recursively walk a file hierarchy."""
+    return (os.path.join(dir, f) for dir, _, fs in os.walk(dir) for f in fs)
+
+
+def iter_pubs(config_dir):
+    """Discover local EventLogging publishers."""
+    for line in fileinput.input(iter_files(config_dir)):
+        match = re.match('tcp://\*:(\d+)', line)
+        if match:
+            name = os.path.basename(fileinput.filename())
+            yield name, match.expand('tcp://127.0.0.1:\g<1>')
+
+
+def monitor_pubs(endpoints, counters):
+    """
+    Count events streaming on a set of EventLogging publishers.
+
+    *endpoints* is a dict that maps human-readable endpoint names to
+    endpoint URIs. The names are used as metric names in Ganglia and
+    as the ZMQ_IDENTITY of the underlying socket.
+
+    """
+    ctx = zmq.Context.instance()
+    poller = zmq.Poller()
+
+    sockets = {}
+    for name, uri in endpoints.iteritems():
+        logging.info('Registering %s (%s).', name, uri)
+        socket = ctx.socket(zmq.SUB)
+        socket.connect(uri)
+        socket.setsockopt(zmq.SUBSCRIBE, '')
+        poller.register(socket, zmq.POLLIN)
+        sockets[socket] = name
+
+    while 1:
+        try:
+            for socket, _ in poller.poll():
+                socket.recv(zmq.NOBLOCK)
+                counters[sockets[socket]] += 1
+        except zmq.ZMQError as e:
+            # Calls interrupted by EINTR should be re-tried.
+            if e.errno == errno.EINTR:
+                continue
+            raise
+
+
+def metric_init(params):
+    """
+    Initialize metrics.
+
+    Recurses through /etc/eventlogging.d in search of local EventLogging
+    publishers, spawn a worker thread to monitor them, and return a list of
+    metric descriptors.
+
+    """
+    prefix = params.get('prefix', 'zeromq_')
+    config_dir = params.get('config_dir', '/etc/eventlogging.d')
+    pubs = {prefix + k:v for k, v in iter_pubs(config_dir)}
+    counters = {k:0 for k in pubs}
+
+    thread = threading.Thread(target=monitor_pubs, args=(pubs, counters))
+    thread.daemon = True
+    thread.start()
+
+    for _ in range(20):
+        time.sleep(0.1)
+        if sum(counters.values()):
+            break
+
+    return [dict(defaults, name=p, call_back=counters.get) for p in pubs]
+
+
+def metric_cleanup():
+    """
+    Clean-up handler.
+
+    Gmond requires that this function be defined.
+
+    """
+    pass
+
+
+def self_test():
+    """
+    Perform self-test. Parses *argv* as metric parameters.
+    Message counts are polled and outputted every five seconds.
+
+    """
+    params = dict(arg.split('=') for arg in sys.argv[1:])
+    descriptors = metric_init(params)
+
+    while 1:
+        for descriptor in descriptors:
+            name = descriptor['name']
+            call_back = descriptor['call_back']
+            logging.info('%s: %s', name, call_back(name))
+        time.sleep(5)
+
+
+if __name__ == '__main__':
+    self_test()
diff --git a/ganglia/python_modules/zpubmon.py 
b/ganglia/python_modules/zpubmon.py
deleted file mode 100644
index 1983800..0000000
--- a/ganglia/python_modules/zpubmon.py
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-  ZeroMQ PUB Monitor for Ganglia
-  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-  This is a gmond metric-gathering module which reports a cumulative
-  count of messages published by ZeroMQ publishers.
-
-  To test, invoke with one or more pairs of (endpoint name, endpoint
-  URI) pairs specifying ZMQ publishers to poll. For example:
-
-    $ python zpubmon.py system-events tcp://localhost:8006
-
-  See README for more details.
-
-  :copyright: (c) 2012 by Ori Livneh <[email protected]>
-  :license: GNU General Public Licence 2.0 or later
-
-"""
-import errno
-import logging
-import sys
-import threading
-import time
-
-import zmq
-
-
-logging.basicConfig(format='[ZMQ] %(asctime)s %(message)s', level=logging.INFO)
-
-
-def zmq_pub_mon(endpoints, counter):
-    """
-    Measure throughput of ZeroMQ publishers.
-
-    *endpoints* is a dict that maps human-readable endpoint names to
-    endpoint URIs. The names are used as metric names in Ganglia and
-    as the ZMQ_IDENTITY of the underlying socket.
-
-    """
-    ctx = zmq.Context.instance()
-    poller = zmq.Poller()
-
-    for name, uri in endpoints.iteritems():
-        logging.info('Registering %s (%s).', name, uri)
-        sock = ctx.socket(zmq.SUB)
-        sock.setsockopt(zmq.IDENTITY, name)
-        sock.connect(uri)
-        sock.setsockopt(zmq.SUBSCRIBE, '')
-        poller.register(sock, zmq.POLLIN)
-
-    while 1:
-        try:
-            for socket, _ in poller.poll():
-                socket.recv(zmq.NOBLOCK)
-                name = socket.getsockopt(zmq.IDENTITY)
-                counter[name] += 1
-        except zmq.ZMQError as e:
-            # Calls interrupted by EINTR should be re-tried.
-            if e.errno == errno.EINTR:
-                continue
-            raise
-
-
-def metric_init(params):
-    """
-    Initialize metrics.
-
-    Gmond invokes this method with a dict of arguments specified in
-    zpubmon.py. If *params* contains a `groups` key, its value is used
-    as the group name in Ganglia (in lieu of the default 'ZeroMQ').
-    Other items are interpreted as (name: URI) pairs of ZeroMQ endpoints
-    to monitor.
-
-    `metric_init` spawns a worker thread to monitor these endpoints and
-    returns a list of metric descriptors.
-
-    """
-    groups = params.pop('groups', 'ZeroMQ')
-    counter = {name: 0 for name in params}
-
-    thread = threading.Thread(target=zmq_pub_mon, args=(params, counter))
-    thread.daemon = True
-    thread.start()
-
-    return [{
-        'name': name,
-        'value_type': 'uint',
-        'format': '%d',
-        'units': 'events',
-        'slope': 'positive',
-        'time_max': 20,
-        'description': 'messages published',
-        'groups': groups,
-        'call_back': counter.get,
-    } for name in params]
-
-
-def metric_cleanup():
-    """
-    Clean-up handler.
-
-    Gmond requires that this function be defined.
-
-    """
-    pass
-
-
-def self_test():
-    """
-    Perform self-test.
-
-    Parses *argv* as a collection of (name, URI) pairs specifying ZeroMQ
-    publishers to be monitored. Message counts are polled and outputted
-    every five seconds.
-
-    """
-    params = dict(zip(sys.argv[1::2], sys.argv[2::2]))
-    if not params:
-        print 'Usage: %s NAME URI [NAME URI, ...]' % sys.argv[0]
-        print 'Example: %s my-zmq-stream tcp://localhost:8006' % sys.argv[0]
-        sys.exit(1)
-
-    descriptors = metric_init(params)
-
-    while 1:
-        for descriptor in descriptors:
-            name = descriptor['name']
-            call_back = descriptor['call_back']
-            logging.info('%s: %s', name, call_back(name))
-        time.sleep(5)
-
-
-if __name__ == '__main__':
-    self_test()

-- 
To view, visit https://gerrit.wikimedia.org/r/74796
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I93504edbee39ca0640fc01072d1abc686521cd4d
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ori.livneh <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to