http://www.mediawiki.org/wiki/Special:Code/MediaWiki/94156
Revision: 94156
Author: mark
Date: 2011-08-10 14:57:26 +0000 (Wed, 10 Aug 2011)
Log Message:
-----------
* Implement Ganglia gmond python plugin; htcpseqcheck can now run standalone or
as part of gmond
* Move more common functions out of htcpseqcheck and udpmcast into util.py
* Implement sliding counts for percentages
Modified Paths:
--------------
trunk/udpmcast/htcpseqcheck.py
trunk/udpmcast/udpmcast.py
trunk/udpmcast/util.py
Added Paths:
-----------
trunk/udpmcast/htcpseqcheck.pyconf
trunk/udpmcast/htcpseqcheck_ganglia.py
Modified: trunk/udpmcast/htcpseqcheck.py
===================================================================
--- trunk/udpmcast/htcpseqcheck.py 2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/htcpseqcheck.py 2011-08-10 14:57:26 UTC (rev 94156)
@@ -6,7 +6,10 @@
#
# $Id$
-import socket, getopt, sys, pwd, grp, struct
+import util
+import socket, getopt, sys, pwd, grp, struct, threading
+
+from util import debug
from datetime import datetime, timedelta
from collections import deque
@@ -18,16 +21,11 @@
# Globals
-debugging = False
sourcebuf = {}
-totalcounts = Counter()
+totalcounts, slidingcounts = Counter(), Counter()
+slidingdeque = deque()
+stats_lock = threading.Lock()
-def debug(msg):
- global debugging
- if debugging:
- print >> sys.stderr, "DEBUG:", msg
-
-
class RingBuffer(deque):
"""
Implements TCP window like behavior
@@ -106,56 +104,60 @@
checkhtcpseq(diagram, srcaddr[0])
+def update_sliding_counts(counts, maxlen=10000):
+ "Implements a sliding window of counts"
+ global slidingdeque, slidingcounts
+
+ slidingcounts += counts
+ slidingdeque.append(counts)
+
+ if len(slidingdeque) > maxlen:
+ slidingcounts -= slidingdeque.popleft()
+
def checkhtcpseq(diagram, srcaddr):
- global sourcebuf, totalcounts
+ global sourcebuf, totalcounts, slidingcounts, stats_lock
transid = struct.unpack('!I', diagram[8:12])[0]
- sb = sourcebuf.setdefault(srcaddr, RingBuffer())
- try:
- counts = sb.add(transid)
- except IndexError:
- pass
- else:
- totalcounts.update(counts)
- if counts['lost']:
- # Lost packets
- print "%d lost packet(s) from %s, last id %d" % (counts['lost'],
srcaddr, transid)
- elif counts['ancient']:
- print "Ancient packet from %s, id %d" % (srcaddr, transid)
-
- if counts['lost'] and sb.counts['dequeued']:
- print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d
ancient, %d received from %s" % (
- sb.counts['lost'],
- sb.counts['dequeued'],
- float(sb.counts['lost'])*100/sb.counts['dequeued'],
- sb.counts['outoforder'],
- sb.counts['dups'],
- sb.counts['ancient'],
- sb.counts['received'],
- srcaddr)
- print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d
ancient, %d received from %d sources" % (
- totalcounts['lost'],
- totalcounts['dequeued'],
- float(totalcounts['lost'])*100/totalcounts['dequeued'],
- totalcounts['outoforder'],
- totalcounts['dups'],
- totalcounts['ancient'],
- totalcounts['received'],
- len(sourcebuf.keys()))
+ with stats_lock: # Critical section
+ sb = sourcebuf.setdefault(srcaddr, RingBuffer())
+ try:
+ counts = sb.add(transid)
+ except IndexError:
+ pass
+ else:
+ totalcounts.update(counts)
+ update_sliding_counts(counts)
+
+ # Don't bother printing stats if sys.stdout is set to None
+ if not sys.stdout: return
+
+ if counts['lost']:
+ # Lost packets
+ print "%d lost packet(s) from %s, last id %d" %
(counts['lost'], srcaddr, transid)
+ elif counts['ancient']:
+ print "Ancient packet from %s, id %d" % (srcaddr, transid)
+
+ if counts['lost'] and sb.counts['dequeued']:
+ print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d
ancient, %d received from %s" % (
+ sb.counts['lost'],
+ sb.counts['dequeued'],
+ float(sb.counts['lost'])*100/sb.counts['dequeued'],
+ sb.counts['outoforder'],
+ sb.counts['dups'],
+ sb.counts['ancient'],
+ sb.counts['received'],
+ srcaddr)
+ print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d
dups, %d ancient, %d received from %d sources" % (
+ slidingcounts['lost'],
+ slidingcounts['dequeued'],
+ float(slidingcounts['lost'])*100/slidingcounts['dequeued'],
+ totalcounts['outoforder'],
+ totalcounts['dups'],
+ totalcounts['ancient'],
+ totalcounts['received'],
+ len(sourcebuf.keys()))
-def join_multicast_group(sock, multicast_group):
- import struct
-
- ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
- socket.INADDR_ANY)
- sock.setsockopt(socket.IPPROTO_IP,
- socket.IP_ADD_MEMBERSHIP,
- ip_mreq)
-
- # We do not want to see our own messages back
- sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
-
def print_help():
print 'Usage:\n\thtcpseqcheck [ options ]\n'
print 'Options:'
@@ -191,7 +193,7 @@
elif option == '-g':
group = value
elif option == '-v':
- debugging = True
+ util.debugging = True
try:
# Change uid and gid
@@ -204,23 +206,25 @@
# Become a daemon
if daemon:
- from util import createDaemon
- createDaemon()
+ util.createDaemon()
- # Open the UDP socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((host, portnr))
+ sock = util.open_htcp_socket(host, portnr)
# Join a multicast group if requested
if multicast_group is not None:
debug('Joining multicast group ' + multicast_group)
- join_multicast_group(sock, multicast_group)
+ util.join_multicast_group(sock, multicast_group)
- # Multiplex everything that comes in
+ # Start receiving HTCP packets
receive_htcp(sock)
except socket.error, msg:
print msg[1];
except KeyboardInterrupt:
pass
+
+# Ganglia gmond module support
+try:
+ from htcpseqcheck_ganglia import metric_init, metric_cleanup
+except ImportError:
+ pass
\ No newline at end of file
Added: trunk/udpmcast/htcpseqcheck.pyconf
===================================================================
--- trunk/udpmcast/htcpseqcheck.pyconf (rev 0)
+++ trunk/udpmcast/htcpseqcheck.pyconf 2011-08-10 14:57:26 UTC (rev 94156)
@@ -0,0 +1,61 @@
+# Ganglia Python gmond module configuration file
+
+modules {
+ module {
+ name = "htcpseqcheck"
+ language = "python"
+
+ param multicast_group {
+ value = "239.128.0.112"
+ }
+
+ param port {
+ value = 4827
+ }
+ }
+}
+
+collection_group {
+ collect_every = 15
+ time_threshold = 15
+
+ metric {
+ name = "htcp_losspct"
+ title = "HTCP packet loss percentage"
+ }
+
+ metric {
+ name = "htcp_dequeued"
+ title = "Dequeued HTCP packets"
+ }
+
+ metric {
+ name = "htcp_dups"
+ title = "Duplicate HTCP packets"
+ }
+
+ metric {
+ name = "htcp_ancient"
+ title = "Ancient HTCP packets"
+ }
+
+ metric {
+ name = "htcp_received"
+ title = "Received HTCP packets"
+ }
+
+ metric {
+ name = "htcp_sources"
+ title = "Unique HTCP senders"
+ }
+
+ metric {
+ name = "htcp_lost"
+ title = "Lost HTCP packets"
+ }
+
+ metric {
+ name = "htcp_outoforder"
+ title = "HTCP packets received out-of-order"
+ }
+}
\ No newline at end of file
Added: trunk/udpmcast/htcpseqcheck_ganglia.py
===================================================================
--- trunk/udpmcast/htcpseqcheck_ganglia.py (rev 0)
+++ trunk/udpmcast/htcpseqcheck_ganglia.py 2011-08-10 14:57:26 UTC (rev
94156)
@@ -0,0 +1,149 @@
+#!/usr/bin/env python
+
+# htcpseqcheck_ganglia.py
+# Ganglia gmond module integration
+
+import htcpseqcheck, util
+import threading, sys, socket, datetime
+
+from util import debug
+
+# Globals
+metrics = {}
+
+class HTCPSeqCheckThread(threading.Thread):
+
+ name = "HTCPSeqCheck"
+ daemon = True
+
+ def run(self, kwargs={}):
+ try:
+ sock = util.open_htcp_socket(kwargs.get('host', ""),
kwargs.get('port', 4827))
+
+ # Join a multicast group if requested
+ if 'multicast_group' in kwargs:
+ debug('Joining multicast group ' + kwargs['multicast_group'])
+ util.join_multicast_group(sock, kwargs['multicast_group'])
+
+ # Set sys.stdout to None; ganglia will do so anyway, and we
+ # can detect this in htcpseqcheck.
+
+ # Start receiving HTCP packets
+ htcpseqcheck.receive_htcp(sock)
+ except socket.error, msg:
+ print >> sys.stderr, msg[1]
+ sys.exit(1)
+
+def build_metrics_dict():
+ "Builds a dict of metric parameter dicts"
+
+ metrics = {
+ 'htcp_losspct': {
+ 'value_type': "float",
+ 'units': "%",
+ 'format': "%.2f",
+ 'slope': "both",
+ 'description': "HTCP packet loss percentage",
+ 'int_name': None,
+ },
+ 'htcp_lost': {
+ 'value_type': "uint",
+ 'units': "packets/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "Lost HTCP packets",
+ 'int_name': "lost",
+ },
+ 'htcp_dequeued': {
+ 'value_type': "uint",
+ 'units': "packets/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "Dequeued HTCP packets",
+ 'int_name': "dequeued",
+ },
+ 'htcp_outoforder': {
+ 'value_type': "uint",
+ 'units': "packets/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "HTCP packets received out-of-order",
+ 'int_name': "outoforder",
+ },
+ 'htcp_dups': {
+ 'value_type': "uint",
+ 'units': "dups/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "Duplicate HTCP packets",
+ 'int_name': "dups",
+ },
+ 'htcp_ancient': {
+ 'value_type': "uint",
+ 'units': "packets/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "Ancient HTCP packets",
+ 'int_name': "ancient",
+ },
+ 'htcp_received': {
+ 'value_type': "uint",
+ 'units': "packets/s",
+ 'format': "%u",
+ 'slope': "positive",
+ 'description': "Received HTCP packets",
+ 'int_name': "received",
+ },
+ 'htcp_sources': {
+ 'value_type': "uint",
+ 'units': "sources",
+ 'format': "%u",
+ 'slope': "both",
+ 'description': "Unique HTCP senders",
+ 'int_name': None,
+ }
+ }
+
+ # Add common values
+ for metricname, metric in metrics.iteritems():
+ metric.update({
+ 'name': metricname,
+ 'call_back': metric_handler,
+ 'time_max': 15,
+ 'groups': "htcp"
+ })
+
+ return metrics
+
+def metric_init(params):
+ # gmond module initialization
+ global metrics
+
+ # Start HTCP metrics collection in a separate thread
+ HTCPSeqCheckThread().start()
+
+ metrics = build_metrics_dict()
+ return list(metrics.values())
+
+def metric_cleanup(params):
+ pass
+
+def metric_handler(name):
+ global metrics, silenceTime
+
+ metric = metrics[name]
+
+ try:
+ with htcpseqcheck.stats_lock: # Critical section
+ if name == "htcp_losspct":
+ return float(htcpseqcheck.slidingcounts['lost']) /
htcpseqcheck.slidingcounts['dequeued'] * 100
+ elif name == "htcp_sources":
+ return len(htcpseqcheck.sourcebuf)
+ else:
+ return htcpseqcheck.totalcounts[metric['int_name']]
+ except:
+ return None
+
+if __name__ == '__main__':
+ for metric in build_metrics_dict().itervalues():
+ print " metric {\n name = \"%(name)s\"\n title =
\"%(description)s\"\n }\n" % metric
\ No newline at end of file
Property changes on: trunk/udpmcast/htcpseqcheck_ganglia.py
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/udpmcast/udpmcast.py
===================================================================
--- trunk/udpmcast/udpmcast.py 2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/udpmcast.py 2011-08-10 14:57:26 UTC (rev 94156)
@@ -6,15 +6,11 @@
#
# $Id$
+import util
import socket, getopt, sys, pwd, grp
-debugging = False
+from util import debugging
-def debug(msg):
- global debugging
- if debugging:
- print msg;
-
def multicast_diagrams(sock, addrrules):
portnr = sock.getsockname()[1];
@@ -88,7 +84,7 @@
elif option == '-g':
group = value
elif option == '-v':
- debugging = True
+ util.debugging = True
elif option == '-t':
multicast_ttl = int(value)
@@ -101,25 +97,20 @@
print "Error: Could not change uid or gid."
sys.exit(-1)
+
# Become a daemon
if daemon:
- from util import createDaemon
- createDaemon()
+ util.createDaemon()
- # Open the UDP socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.bind((host, portnr))
-
- # Set the multicast TTL if requested
+ sock = util.open_htcp_socket(host, portnr)
+
if multicast_ttl is not None:
- sock.setsockopt(socket.IPPROTO_IP,
- socket.IP_MULTICAST_TTL,
- multicast_ttl)
+ util.set_multicast_ttl(sock, multicast_ttl)
# Join a multicast group if requested
if multicast_group is not None:
debug('Joining multicast group ' + multicast_group)
- join_multicast_group(sock, multicast_group)
+ util.join_multicast_group(sock, multicast_group)
# Parse the argument list
addrrules = { 0: [] }
Modified: trunk/udpmcast/util.py
===================================================================
--- trunk/udpmcast/util.py 2011-08-10 14:23:25 UTC (rev 94155)
+++ trunk/udpmcast/util.py 2011-08-10 14:57:26 UTC (rev 94156)
@@ -1,8 +1,45 @@
# util.py
# utility functions shared by udpmcast and htcpseqcheck
-import os, signal
+import sys, os, signal, socket
+# Globals
+
+debugging = False
+
+def debug(msg):
+ global debugging
+
+ if debugging:
+ print >> sys.stderr, "DEBUG:", msg
+
+def open_htcp_socket(host="", portnr=4827):
+ # Open the UDP socket
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, portnr))
+
+ return sock
+
+def join_multicast_group(sock, multicast_group):
+ import struct
+
+ ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
+ socket.INADDR_ANY)
+ sock.setsockopt(socket.IPPROTO_IP,
+ socket.IP_ADD_MEMBERSHIP,
+ ip_mreq)
+
+ # We do not want to see our own messages back
+ sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
+
+def set_multicast_ttl(sock, ttl):
+ # Set the multicast TTL if requested
+ sock.setsockopt(socket.IPPROTO_IP,
+ socket.IP_MULTICAST_TTL,
+ ttl)
+
+
def createDaemon():
"""
Detach a process from the controlling terminal and run it in the
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs