http://www.mediawiki.org/wiki/Special:Code/MediaWiki/94156

Revision: 94156
Author:   mark
Date:     2011-08-10 14:57:26 +0000 (Wed, 10 Aug 2011)
Log Message:
-----------
* Implement Ganglia gmond python plugin; htcpseqcheck can now run standalone or 
as part of gmond
* Move more common functions out of htcpseqcheck and udpmcast into util.py
* Implement sliding counts for percentages

Modified Paths:
--------------
    trunk/udpmcast/htcpseqcheck.py
    trunk/udpmcast/udpmcast.py
    trunk/udpmcast/util.py

Added Paths:
-----------
    trunk/udpmcast/htcpseqcheck.pyconf
    trunk/udpmcast/htcpseqcheck_ganglia.py

Modified: trunk/udpmcast/htcpseqcheck.py
===================================================================
--- trunk/udpmcast/htcpseqcheck.py      2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/htcpseqcheck.py      2011-08-10 14:57:26 UTC (rev 94156)
@@ -6,7 +6,10 @@
 #
 # $Id$
 
-import socket, getopt, sys, pwd, grp, struct
+import util
+import socket, getopt, sys, pwd, grp, struct, threading
+
+from util import debug
     
 from datetime import datetime, timedelta
 from collections import deque
@@ -18,16 +21,11 @@
     
 # Globals
 
-debugging = False
 sourcebuf = {}
-totalcounts = Counter()
+totalcounts, slidingcounts = Counter(), Counter()
+slidingdeque = deque()
+stats_lock = threading.Lock()
 
-def debug(msg):
-    global debugging
-    if debugging:
-        print >> sys.stderr, "DEBUG:", msg
-
-
 class RingBuffer(deque):
     """
     Implements TCP window like behavior
@@ -106,56 +104,60 @@
 
         checkhtcpseq(diagram, srcaddr[0])
 
+def update_sliding_counts(counts, maxlen=10000):
+    "Implements a sliding window of counts"
+    global slidingdeque, slidingcounts
+    
+    slidingcounts += counts
+    slidingdeque.append(counts)
+    
+    if len(slidingdeque) > maxlen:
+        slidingcounts -= slidingdeque.popleft()   
+
 def checkhtcpseq(diagram, srcaddr):
-    global sourcebuf, totalcounts
+    global sourcebuf, totalcounts, slidingcounts, stats_lock
 
     transid = struct.unpack('!I', diagram[8:12])[0]
 
-    sb = sourcebuf.setdefault(srcaddr, RingBuffer())
-    try:
-        counts = sb.add(transid)
-    except IndexError:
-        pass
-    else:
-        totalcounts.update(counts)
-        if counts['lost']:
-            # Lost packets
-            print "%d lost packet(s) from %s, last id %d" % (counts['lost'], 
srcaddr, transid)
-        elif counts['ancient']:
-            print "Ancient packet from %s, id %d" % (srcaddr, transid)
-        
-        if counts['lost'] and sb.counts['dequeued']:
-            print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d 
ancient, %d received from %s" % (
-                sb.counts['lost'],
-                sb.counts['dequeued'],
-                float(sb.counts['lost'])*100/sb.counts['dequeued'],
-                sb.counts['outoforder'],
-                sb.counts['dups'],
-                sb.counts['ancient'],
-                sb.counts['received'],
-                srcaddr)
-            print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d 
ancient, %d received from %d sources" % (
-                totalcounts['lost'],
-                totalcounts['dequeued'],
-                float(totalcounts['lost'])*100/totalcounts['dequeued'],
-                totalcounts['outoforder'],
-                totalcounts['dups'],
-                totalcounts['ancient'],
-                totalcounts['received'],
-                len(sourcebuf.keys()))
+    with stats_lock:    # Critical section
+        sb = sourcebuf.setdefault(srcaddr, RingBuffer())
+        try:
+            counts = sb.add(transid)
+        except IndexError:
+            pass
+        else:
+            totalcounts.update(counts)
+            update_sliding_counts(counts)
+            
+            # Don't bother printing stats if sys.stdout is set to None
+            if not sys.stdout: return
+            
+            if counts['lost']:
+                # Lost packets
+                print "%d lost packet(s) from %s, last id %d" % 
(counts['lost'], srcaddr, transid)
+            elif counts['ancient']:
+                print "Ancient packet from %s, id %d" % (srcaddr, transid)
+            
+            if counts['lost'] and sb.counts['dequeued']:
+                print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d 
ancient, %d received from %s" % (
+                    sb.counts['lost'],
+                    sb.counts['dequeued'],
+                    float(sb.counts['lost'])*100/sb.counts['dequeued'],
+                    sb.counts['outoforder'],
+                    sb.counts['dups'],
+                    sb.counts['ancient'],
+                    sb.counts['received'],
+                    srcaddr)
+                print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d 
dups, %d ancient, %d received from %d sources" % (
+                    slidingcounts['lost'],
+                    slidingcounts['dequeued'],
+                    float(slidingcounts['lost'])*100/slidingcounts['dequeued'],
+                    totalcounts['outoforder'],
+                    totalcounts['dups'],
+                    totalcounts['ancient'],
+                    totalcounts['received'],
+                    len(sourcebuf.keys()))
 
-def join_multicast_group(sock, multicast_group):
-    import struct
-
-    ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
-        socket.INADDR_ANY)
-    sock.setsockopt(socket.IPPROTO_IP,
-                    socket.IP_ADD_MEMBERSHIP,
-                    ip_mreq)
-
-    # We do not want to see our own messages back
-    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
-
 def print_help():
     print 'Usage:\n\thtcpseqcheck [ options ]\n'
     print 'Options:'
@@ -191,7 +193,7 @@
         elif option == '-g':
             group = value
         elif option == '-v':
-            debugging = True
+            util.debugging = True
 
     try:
         # Change uid and gid
@@ -204,23 +206,25 @@
 
         # Become a daemon
         if daemon:
-            from util import createDaemon
-            createDaemon()
+            util.createDaemon()
 
-        # Open the UDP socket
-        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 
socket.IPPROTO_UDP)
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        sock.bind((host, portnr))
+        sock = util.open_htcp_socket(host, portnr)
         
         # Join a multicast group if requested
         if multicast_group is not None:
             debug('Joining multicast group ' + multicast_group)
-            join_multicast_group(sock, multicast_group)
+            util.join_multicast_group(sock, multicast_group)
 
-        # Multiplex everything that comes in
+        # Start receiving HTCP packets
         receive_htcp(sock)
     except socket.error, msg:
         print msg[1];
     except KeyboardInterrupt:
         pass
 
+
+# Ganglia gmond module support
+try:
+    from htcpseqcheck_ganglia import metric_init, metric_cleanup
+except ImportError:
+    pass
\ No newline at end of file

Added: trunk/udpmcast/htcpseqcheck.pyconf
===================================================================
--- trunk/udpmcast/htcpseqcheck.pyconf                          (rev 0)
+++ trunk/udpmcast/htcpseqcheck.pyconf  2011-08-10 14:57:26 UTC (rev 94156)
@@ -0,0 +1,61 @@
+# Ganglia Python gmond module configuration file
+
+modules {
+  module {
+    name = "htcpseqcheck"
+    language = "python"
+
+       param multicast_group {
+         value = "239.128.0.112"
+       }
+       
+       param port {
+         value = 4827
+       }
+  }
+}
+
+collection_group {
+  collect_every = 15
+  time_threshold = 15
+
+  metric {
+    name = "htcp_losspct"
+    title = "HTCP packet loss percentage"
+  }
+
+  metric {
+    name = "htcp_dequeued"
+    title = "Dequeued HTCP packets"
+  }
+
+  metric {
+    name = "htcp_dups"
+    title = "Duplicate HTCP packets"
+  }
+
+  metric {
+    name = "htcp_ancient"
+    title = "Ancient HTCP packets"
+  }
+
+  metric {
+    name = "htcp_received"
+    title = "Received HTCP packets"
+  }
+
+  metric {
+    name = "htcp_sources"
+    title = "Unique HTCP senders"
+  }
+
+  metric {
+    name = "htcp_lost"
+    title = "Lost HTCP packets"
+  }
+
+  metric {
+    name = "htcp_outoforder"
+    title = "HTCP packets received out-of-order"
+  }
+}
\ No newline at end of file

Added: trunk/udpmcast/htcpseqcheck_ganglia.py
===================================================================
--- trunk/udpmcast/htcpseqcheck_ganglia.py                              (rev 0)
+++ trunk/udpmcast/htcpseqcheck_ganglia.py      2011-08-10 14:57:26 UTC (rev 
94156)
@@ -0,0 +1,149 @@
+#!/usr/bin/env python
+
+# htcpseqcheck_ganglia.py
+# Ganglia gmond module integration
+
+import htcpseqcheck, util
+import threading, sys, socket, datetime
+
+from util import debug
+
+# Globals
+metrics = {}
+
+class HTCPSeqCheckThread(threading.Thread):
+    
+    name = "HTCPSeqCheck"
+    daemon = True
+    
+    def run(self, kwargs={}):
+        try:
+            sock = util.open_htcp_socket(kwargs.get('host', ""), 
kwargs.get('port', 4827))
+        
+            # Join a multicast group if requested
+            if 'multicast_group' in kwargs:
+                debug('Joining multicast group ' + kwargs['multicast_group'])
+                util.join_multicast_group(sock, kwargs['multicast_group'])
+
+            # Set sys.stdout to None; ganglia will do so anyway, and we
+            # can detect this in htcpseqcheck.
+
+            # Start receiving HTCP packets
+            htcpseqcheck.receive_htcp(sock)
+        except socket.error, msg:
+            print >> sys.stderr, msg[1]
+            sys.exit(1)
+
+def build_metrics_dict():
+    "Builds a dict of metric parameter dicts"
+
+    metrics = {
+        'htcp_losspct':  {
+            'value_type':   "float",
+            'units':        "%",
+            'format':       "%.2f",
+            'slope':        "both",
+            'description':  "HTCP packet loss percentage",
+            'int_name':     None,
+        },
+        'htcp_lost': {
+            'value_type':   "uint",
+            'units':        "packets/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "Lost HTCP packets",
+            'int_name':     "lost",
+        },
+        'htcp_dequeued': {
+            'value_type':   "uint",
+            'units':        "packets/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "Dequeued HTCP packets",
+            'int_name':     "dequeued",
+        },
+        'htcp_outoforder': {
+            'value_type':   "uint",
+            'units':        "packets/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "HTCP packets received out-of-order",
+            'int_name':     "outoforder",
+        },
+        'htcp_dups': {
+            'value_type':   "uint",
+            'units':        "dups/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "Duplicate HTCP packets",
+            'int_name':     "dups",
+        },
+        'htcp_ancient': {
+            'value_type':   "uint",
+            'units':        "packets/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "Ancient HTCP packets",
+            'int_name':     "ancient",
+        },
+        'htcp_received': {
+            'value_type':   "uint",
+            'units':        "packets/s",
+            'format':       "%u",
+            'slope':        "positive",
+            'description':  "Received HTCP packets",
+            'int_name':     "received",
+        },
+        'htcp_sources':  {
+            'value_type':   "uint",
+            'units':        "sources",
+            'format':       "%u",
+            'slope':        "both",
+            'description':  "Unique HTCP senders",
+            'int_name':     None,
+        }
+    }
+    
+    # Add common values
+    for metricname, metric in metrics.iteritems():
+        metric.update({
+            'name': metricname,
+            'call_back': metric_handler,
+            'time_max': 15,
+            'groups': "htcp"
+        })
+    
+    return metrics
+
+def metric_init(params):
+    # gmond module initialization
+    global metrics
+    
+    # Start HTCP metrics collection in a separate thread
+    HTCPSeqCheckThread().start()
+
+    metrics = build_metrics_dict() 
+    return list(metrics.values())
+
+def metric_cleanup(params):
+    pass
+
+def metric_handler(name):
+    global metrics, silenceTime
+
+    metric = metrics[name]
+    
+    try:
+        with htcpseqcheck.stats_lock:   # Critical section
+            if name == "htcp_losspct":
+                return float(htcpseqcheck.slidingcounts['lost']) / 
htcpseqcheck.slidingcounts['dequeued'] * 100
+            elif name == "htcp_sources":
+                return len(htcpseqcheck.sourcebuf)
+            else:
+                return htcpseqcheck.totalcounts[metric['int_name']]
+    except:
+        return None
+
+if __name__ == '__main__':
+    for metric in build_metrics_dict().itervalues():
+        print "  metric {\n    name = \"%(name)s\"\n    title = 
\"%(description)s\"\n  }\n" % metric
\ No newline at end of file


Property changes on: trunk/udpmcast/htcpseqcheck_ganglia.py
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/udpmcast/udpmcast.py
===================================================================
--- trunk/udpmcast/udpmcast.py  2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/udpmcast.py  2011-08-10 14:57:26 UTC (rev 94156)
@@ -6,15 +6,11 @@
 #
 # $Id$
 
+import util
 import socket, getopt, sys, pwd, grp
 
-debugging = False
+from util import debugging
 
-def debug(msg):
-    global debugging
-    if debugging:
-        print msg;
-
 def multicast_diagrams(sock, addrrules):
     portnr = sock.getsockname()[1];
 
@@ -88,7 +84,7 @@
             elif option == '-g':
                 group = value
             elif option == '-v':
-                debugging = True
+                util.debugging = True
             elif option == '-t':
                 multicast_ttl = int(value)
 
@@ -101,25 +97,20 @@
             print "Error: Could not change uid or gid."
             sys.exit(-1)
 
+
         # Become a daemon
         if daemon:
-            from util import createDaemon
-            createDaemon()
+            util.createDaemon()
 
-        # Open the UDP socket
-        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        sock.bind((host, portnr))
-        
-        # Set the multicast TTL if requested
+        sock = util.open_htcp_socket(host, portnr)
+
         if multicast_ttl is not None:
-            sock.setsockopt(socket.IPPROTO_IP,
-                    socket.IP_MULTICAST_TTL,
-                    multicast_ttl)  
+            util.set_multicast_ttl(sock, multicast_ttl)
 
         # Join a multicast group if requested
         if multicast_group is not None:
             debug('Joining multicast group ' + multicast_group)
-            join_multicast_group(sock, multicast_group)
+            util.join_multicast_group(sock, multicast_group)
 
         # Parse the argument list
         addrrules = { 0: [] }

Modified: trunk/udpmcast/util.py
===================================================================
--- trunk/udpmcast/util.py      2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/util.py      2011-08-10 14:57:26 UTC (rev 94156)
@@ -1,8 +1,45 @@
 # util.py
 # utility functions shared by udpmcast and htcpseqcheck
 
-import os, signal
+import sys, os, signal, socket
 
+# Globals
+
+debugging = False
+
+def debug(msg):
+    global debugging
+    
+    if debugging:
+        print >> sys.stderr, "DEBUG:", msg
+        
+def open_htcp_socket(host="", portnr=4827):
+    # Open the UDP socket
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+    sock.bind((host, portnr))
+    
+    return sock
+
+def join_multicast_group(sock, multicast_group):
+    import struct
+
+    ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
+        socket.INADDR_ANY)
+    sock.setsockopt(socket.IPPROTO_IP,
+                    socket.IP_ADD_MEMBERSHIP,
+                    ip_mreq)
+
+    # We do not want to see our own messages back
+    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
+
+def set_multicast_ttl(sock, ttl):
+    # Set the multicast TTL if requested
+    sock.setsockopt(socket.IPPROTO_IP,
+                    socket.IP_MULTICAST_TTL,
+                    ttl)  
+
+
 def createDaemon():
    """
    Detach a process from the controlling terminal and run it in the


_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to