Revision: 351
Author: bslatkin
Date: Thu Mar 11 23:39:17 2010
Log: hub: adds memory optimizations for stats, domain mapping cache
http://code.google.com/p/pubsubhubbub/source/detail?r=351
Modified:
/trunk/hub/dos.py
/trunk/hub/dos_test.py
/trunk/hub/main.py
/trunk/hub/stats_table.html
=======================================
--- /trunk/hub/dos.py Mon Mar 1 12:24:09 2010
+++ /trunk/hub/dos.py Thu Mar 11 23:39:17 2010
@@ -17,6 +17,7 @@
"""Decorators and utilities for attack protection and statistics
gathering."""
+import gc
import logging
import os
import random
@@ -197,9 +198,21 @@
'livejournal.com',
])
+# Maximum size of the cache of URLs to domains.
+DOMAIN_CACHE_SIZE = 100
+
+# Simple local cache used for per-request URL to domain mappings.
+_DOMAIN_CACHE = {}
+
def get_url_domain(url):
"""Returns the domain for a URL or 'bad_url if it's not a valid URL."""
+ result = _DOMAIN_CACHE.get(url)
+ if result is not None:
+ return result
+ if len(_DOMAIN_CACHE) >= DOMAIN_CACHE_SIZE:
+ _DOMAIN_CACHE.clear()
+
match = URL_DOMAIN_RE.match(url)
if match:
groups = list(match.groups())
@@ -209,7 +222,10 @@
groups = filter(bool, groups)
else:
groups = []
- return (groups + ['bad_url'])[0]
+ result = (groups + ['bad_url'])[0]
+
+ _DOMAIN_CACHE[url] = result
+ return result
################################################################################
@@ -402,7 +418,11 @@
value = key
else:
value = get_url_domain(key)
- return unicode(value[:self.max_value_length]).encode('utf-8')
+ if len(value) > self.max_value_length:
+ value = value[:self.max_value_length]
+ if isinstance(value, unicode):
+ value = unicode(value).encode('utf-8')
+ return value
def should_sample(self, key, coin_flip):
"""Checks if the key should be sampled.
@@ -804,6 +824,10 @@
Returns:
SampleResult object containing the result data.
"""
+ # Make sure the key is converted into the format expected by the
config.
+ if single_key is not None:
+ single_key = config.adjust_value(single_key)
+
keys = [config.start_key, config.counter_key]
for i in xrange(config.samples):
keys.append(config.position_key(i))
@@ -850,6 +874,29 @@
return results
+ def get_chain(self, *configs, **kwargs):
+ """Gets statistics for a set of configs, optionally for a single key.
+
+ For retrieving multiple configs sequentially in a way that ensures that
+ the memory usage of the previous result is garbage collected before the
+ next one is returned.
+
+ Args:
+ *configs: Set of configs to retrieve.
+ **kwargs: Keyword arguments to pass to the 'get' method of this
class.
+
+ Returns:
+ Generator that yields each SampleResult object for each config.
+ """
+ for config in configs:
+ result = self.get(config, **kwargs)
+ yield result
+ del result
+ # NOTE: This kinda sucks, but the result sets are really large so
+ # we need to make sure the garbage collector is doing its job so we
+ # don't run bloat memory over the course of a single stats request.
+ gc.collect()
+
################################################################################
class UrlScorer(object):
=======================================
--- /trunk/hub/dos_test.py Mon Mar 1 12:24:09 2010
+++ /trunk/hub/dos_test.py Thu Mar 11 23:39:17 2010
@@ -17,9 +17,12 @@
"""Tests for the dos module."""
+import cProfile
+import gc
import logging
logging.basicConfig(format='%(levelname)-8s %(filename)s] %(message)s')
import os
+import random
import sys
import unittest
@@ -538,6 +541,29 @@
self.assertEquals('bad_url',
dos.get_url_domain('192.168.0.1'))
+ def testCaching(self):
+ """Tests that cache eviction works properly."""
+ dos._DOMAIN_CACHE.clear()
+ old_size = dos.DOMAIN_CACHE_SIZE
+ try:
+ dos.DOMAIN_CACHE_SIZE = 2
+ dos._DOMAIN_CACHE['http://a.example.com/stuff'] = 'a.example.com'
+ dos._DOMAIN_CACHE['http://b.example.com/stuff'] = 'b.example.com'
+ dos._DOMAIN_CACHE['http://c.example.com/stuff'] = 'c.example.com'
+ self.assertEquals(3, len(dos._DOMAIN_CACHE))
+
+ # Old cache entries are hit:
+ self.assertEquals('c.example.com',
+ dos.get_url_domain('http://c.example.com/stuff'))
+ self.assertEquals(3, len(dos._DOMAIN_CACHE))
+
+ # New cache entries clear the contents.
+ self.assertEquals('d.example.com',
+ dos.get_url_domain('http://d.example.com/stuff'))
+ self.assertEquals(1, len(dos._DOMAIN_CACHE))
+ finally:
+ dos.DOMAIN_CACHE_SIZE = old_size
+
################################################################################
class OffsetOrAddTest(unittest.TestCase):
@@ -1069,7 +1095,7 @@
self.verify_sample(results, self.domainC, 1, 0.1)
self.verify_sample(results, self.domainD, 1, 0.1)
- results = sampler.get(config, self.domainB)
+ results = sampler.get(config, self.url2)
self.assertEquals(6, results.total_samples)
self.assertEquals(6, results.unique_samples)
self.verify_sample(results, self.domainB, 6, 0.6)
@@ -1254,6 +1280,74 @@
self.verify_sample(results, self.domainC, 1, 0.25)
self.verify_sample(results, self.domainD, 1, 0.25)
+ def testGetChain(self):
+ """Tests getting results from multiple configs in a single call."""
+ config1 = dos.ReservoirConfig(
+ 'first',
+ period=300,
+ rate=1,
+ samples=10000,
+ by_domain=True)
+ config2 = dos.ReservoirConfig(
+ 'second',
+ period=300,
+ rate=1,
+ samples=10000,
+ by_url=True)
+ sampler = dos.MultiSampler([config1, config2],
gettime=self.fake_gettime)
+
+ reporter = dos.Reporter()
+ reporter.set(self.url1, config1)
+ reporter.set(self.url2, config1)
+ reporter.set(self.url3, config1)
+ reporter.set(self.url4, config1)
+ reporter.set(self.url5, config1)
+ reporter.set(self.url1, config2)
+ reporter.set(self.url2, config2)
+ reporter.set(self.url3, config2)
+ reporter.set(self.url4, config2)
+ reporter.set(self.url5, config2)
+ self.gettime_results.extend([0, 10, 10, 10, 10])
+ sampler.sample(reporter)
+ result_iter = sampler.get_chain(config1, config2)
+
+ # Results for config1
+ results = result_iter.next()
+ self.assertEquals(5, results.total_samples)
+ self.assertEquals(5, results.unique_samples)
+ self.verify_sample(results, self.domainA, 1, 0.1)
+ self.verify_sample(results, self.domainB, 2, 0.2)
+ self.verify_sample(results, self.domainC, 1, 0.1)
+ self.verify_sample(results, self.domainD, 1, 0.1)
+
+ # Results for config2
+ results = result_iter.next()
+ self.assertEquals(5, results.total_samples)
+ self.assertEquals(5, results.unique_samples)
+ self.verify_sample(results, self.url1, 1, 0.1)
+ self.verify_sample(results, self.url2, 1, 0.1)
+ self.verify_sample(results, self.url3, 1, 0.1)
+ self.verify_sample(results, self.url4, 1, 0.1)
+ self.verify_sample(results, self.url5, 1, 0.1)
+
+ # Single key test
+ result_iter = sampler.get_chain(
+ config1, config2,
+ single_key=self.url2)
+
+ # Results for config1
+ results = result_iter.next()
+ self.assertEquals(2, results.total_samples)
+ self.assertEquals(2, results.unique_samples)
+ self.verify_sample(results, self.domainB, 2, 0.2)
+
+ # Results for config2
+ results = result_iter.next()
+ self.assertEquals(1, results.total_samples)
+ self.assertEquals(1, results.unique_samples)
+ self.verify_sample(results, self.url2, 1, 0.1)
+
+
def testConfig(self):
"""Tests config validation."""
# Bad name.
@@ -1373,6 +1467,84 @@
tolerance='bad',
by_domain=True)
+ def testSampleProfile(self):
+ """Profiles the sample method with lots of data."""
+ print 'Tracked objects start',len(gc.get_objects())
+ config = dos.ReservoirConfig(
+ 'testing',
+ period=10,
+ rate=1,
+ samples=10000,
+ by_domain=True)
+ sampler = dos.MultiSampler([config])
+ reporter = dos.Reporter()
+ fake_urls = ['http://example-%s.com/meep' % i
+ for i in xrange(100)]
+ for i in xrange(100000):
+ reporter.set(random.choice(fake_urls), config, random.randint(0,
10000))
+ del fake_urls
+ gc.collect()
+ dos._DOMAIN_CACHE.clear()
+
+ gc.disable()
+ gc.set_debug(gc.DEBUG_STATS | gc.DEBUG_LEAK)
+ try:
+ # Swap the two following lines to profile memory vs. CPU
+ sampler.sample(reporter)
+ #cProfile.runctx('sampler.sample(reporter)', globals(), locals())
+ memcache.flush_all() # Clear the string references
+ print 'Tracked objects before collection', len(gc.get_objects())
+ dos._DOMAIN_CACHE.clear()
+ del reporter
+ del sampler
+ finally:
+ print 'Unreachable', gc.collect()
+ print 'Tracked objects after collection', len(gc.get_objects())
+ gc.set_debug(0)
+ gc.enable()
+
+ def testGetProfile(self):
+ """Profiles the get method when there's lots of data."""
+ print 'Tracked objects start',len(gc.get_objects())
+ config = dos.ReservoirConfig(
+ 'testing',
+ period=10,
+ rate=1,
+ samples=10000,
+ by_domain=True)
+ sampler = dos.MultiSampler([config])
+ reporter = dos.Reporter()
+ fake_urls = ['http://example-%s.com/meep' % i
+ for i in xrange(100)]
+ for i in xrange(100000):
+ reporter.set(random.choice(fake_urls), config, random.randint(0,
10000))
+ del fake_urls
+ dos._DOMAIN_CACHE.clear()
+ gc.collect()
+ sampler.sample(reporter)
+
+ gc.disable()
+ gc.set_debug(gc.DEBUG_STATS | gc.DEBUG_LEAK)
+ try:
+ # Swap the two following lines to profile memory vs. CPU
+ result = sampler.get(config)
+ #cProfile.runctx('result = sampler.get(config)', globals(), locals())
+ memcache.flush_all() # Clear the string references
+ print 'Tracked objects before collection', len(gc.get_objects())
+ try:
+ del locals()['result']
+ del result
+ except:
+ pass
+ dos._DOMAIN_CACHE.clear()
+ del reporter
+ del sampler
+ finally:
+ print 'Unreachable', gc.collect()
+ print 'Tracked objects after collection', len(gc.get_objects())
+ gc.set_debug(0)
+ gc.enable()
+
################################################################################
class UrlScorerTest(unittest.TestCase):
=======================================
--- /trunk/hub/main.py Mon Mar 1 00:07:24 2010
+++ /trunk/hub/main.py Thu Mar 11 23:39:17 2010
@@ -2883,18 +2883,18 @@
'last_header_footer': feed.header_footer,
'fetch_blocked': not fetch_score[0],
'fetch_errors': fetch_score[1] * 100,
- 'fetch_url_error': [
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_MINUTE, topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_30_MINUTE, topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_HOUR, topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_DAY, topic_url),
- ],
- 'fetch_url_latency': [
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_MINUTE_LATENCY, topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_30_MINUTE_LATENCY,
topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_HOUR_LATENCY, topic_url),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_DAY_LATENCY, topic_url),
- ],
+ 'fetch_url_error': FETCH_SAMPLER.get_chain(
+ FETCH_URL_SAMPLE_MINUTE,
+ FETCH_URL_SAMPLE_30_MINUTE,
+ FETCH_URL_SAMPLE_HOUR,
+ FETCH_URL_SAMPLE_DAY,
+ single_key=topic_url),
+ 'fetch_url_latency': FETCH_SAMPLER.get_chain(
+ FETCH_URL_SAMPLE_MINUTE_LATENCY,
+ FETCH_URL_SAMPLE_30_MINUTE_LATENCY,
+ FETCH_URL_SAMPLE_HOUR_LATENCY,
+ FETCH_URL_SAMPLE_DAY_LATENCY,
+ single_key=topic_url),
}
fetch = FeedToFetch.get_by_topic(topic_url)
@@ -2958,42 +2958,30 @@
for e in failed_events],
'delivery_blocked': not delivery_score[0],
'delivery_errors': delivery_score[1] * 100,
- 'delivery_url_error': [
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_MINUTE, callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_30_MINUTE,
callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_HOUR, callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_DAY, callback_url),
- ],
- 'delivery_url_latency': [
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_MINUTE_LATENCY,
- callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_30_MINUTE_LATENCY,
- callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_HOUR_LATENCY,
- callback_url),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_DAY_LATENCY,
- callback_url),
- ],
- 'delivery_domain_error': [
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_MINUTE,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_30_MINUTE,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_HOUR,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_DAY,
- callback_domain),
- ],
- 'delivery_domain_latency': [
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_MINUTE_LATENCY,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_30_MINUTE_LATENCY,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_HOUR_LATENCY,
- callback_domain),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_DAY_LATENCY,
- callback_domain),
- ],
+ 'delivery_url_error': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_URL_SAMPLE_MINUTE,
+ DELIVERY_URL_SAMPLE_30_MINUTE,
+ DELIVERY_URL_SAMPLE_HOUR,
+ DELIVERY_URL_SAMPLE_DAY,
+ single_key=callback_url),
+ 'delivery_url_latency': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_URL_SAMPLE_MINUTE_LATENCY,
+ DELIVERY_URL_SAMPLE_30_MINUTE_LATENCY,
+ DELIVERY_URL_SAMPLE_HOUR_LATENCY,
+ DELIVERY_URL_SAMPLE_DAY_LATENCY,
+ single_key=callback_url),
+ 'delivery_domain_error': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_DOMAIN_SAMPLE_MINUTE,
+ DELIVERY_DOMAIN_SAMPLE_30_MINUTE,
+ DELIVERY_DOMAIN_SAMPLE_HOUR,
+ DELIVERY_DOMAIN_SAMPLE_DAY,
+ single_key=callback_url),
+ 'delivery_domain_latency': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_DOMAIN_SAMPLE_MINUTE_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_30_MINUTE_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_HOUR_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_DAY_LATENCY,
+ single_key=callback_url),
})
self.response.out.write(template.render('event_details.html', context))
@@ -3010,54 +2998,46 @@
def get(self):
context = {
- 'fetch_url_error': [
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_MINUTE),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_30_MINUTE),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_HOUR),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_DAY),
- ],
- 'fetch_url_latency': [
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_MINUTE_LATENCY),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_30_MINUTE_LATENCY),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_HOUR_LATENCY),
- FETCH_SAMPLER.get(FETCH_URL_SAMPLE_DAY_LATENCY),
- ],
- 'fetch_domain_error': [
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_MINUTE),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_30_MINUTE),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_HOUR),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_DAY),
- ],
- 'fetch_domain_latency': [
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_MINUTE_LATENCY),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_30_MINUTE_LATENCY),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_HOUR_LATENCY),
- FETCH_SAMPLER.get(FETCH_DOMAIN_SAMPLE_DAY_LATENCY),
- ],
- 'delivery_url_error': [
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_MINUTE),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_30_MINUTE),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_HOUR),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_DAY),
- ],
- 'delivery_url_latency': [
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_MINUTE_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_30_MINUTE_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_HOUR_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_URL_SAMPLE_DAY_LATENCY),
- ],
- 'delivery_domain_error': [
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_MINUTE),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_30_MINUTE),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_HOUR),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_DAY),
- ],
- 'delivery_domain_latency': [
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_MINUTE_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_30_MINUTE_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_HOUR_LATENCY),
- DELIVERY_SAMPLER.get(DELIVERY_DOMAIN_SAMPLE_DAY_LATENCY),
- ],
+ 'fetch_url_error': FETCH_SAMPLER.get_chain(
+ FETCH_URL_SAMPLE_MINUTE,
+ FETCH_URL_SAMPLE_30_MINUTE,
+ FETCH_URL_SAMPLE_HOUR,
+ FETCH_URL_SAMPLE_DAY),
+ 'fetch_url_latency': FETCH_SAMPLER.get_chain(
+ FETCH_URL_SAMPLE_MINUTE_LATENCY,
+ FETCH_URL_SAMPLE_30_MINUTE_LATENCY,
+ FETCH_URL_SAMPLE_HOUR_LATENCY,
+ FETCH_URL_SAMPLE_DAY_LATENCY),
+ 'fetch_domain_error': FETCH_SAMPLER.get_chain(
+ FETCH_DOMAIN_SAMPLE_MINUTE,
+ FETCH_DOMAIN_SAMPLE_30_MINUTE,
+ FETCH_DOMAIN_SAMPLE_HOUR,
+ FETCH_DOMAIN_SAMPLE_DAY),
+ 'fetch_domain_latency': FETCH_SAMPLER.get_chain(
+ FETCH_DOMAIN_SAMPLE_MINUTE_LATENCY,
+ FETCH_DOMAIN_SAMPLE_30_MINUTE_LATENCY,
+ FETCH_DOMAIN_SAMPLE_HOUR_LATENCY,
+ FETCH_DOMAIN_SAMPLE_DAY_LATENCY),
+ 'delivery_url_error': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_URL_SAMPLE_MINUTE,
+ DELIVERY_URL_SAMPLE_30_MINUTE,
+ DELIVERY_URL_SAMPLE_HOUR,
+ DELIVERY_URL_SAMPLE_DAY),
+ 'delivery_url_latency': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_URL_SAMPLE_MINUTE_LATENCY,
+ DELIVERY_URL_SAMPLE_30_MINUTE_LATENCY,
+ DELIVERY_URL_SAMPLE_HOUR_LATENCY,
+ DELIVERY_URL_SAMPLE_DAY_LATENCY),
+ 'delivery_domain_error': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_DOMAIN_SAMPLE_MINUTE,
+ DELIVERY_DOMAIN_SAMPLE_30_MINUTE,
+ DELIVERY_DOMAIN_SAMPLE_HOUR,
+ DELIVERY_DOMAIN_SAMPLE_DAY),
+ 'delivery_domain_latency': DELIVERY_SAMPLER.get_chain(
+ DELIVERY_DOMAIN_SAMPLE_MINUTE_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_30_MINUTE_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_HOUR_LATENCY,
+ DELIVERY_DOMAIN_SAMPLE_DAY_LATENCY),
}
all_configs = []
all_configs.extend(FETCH_SAMPLER.configs)
=======================================
--- /trunk/hub/stats_table.html Mon Mar 1 12:24:09 2010
+++ /trunk/hub/stats_table.html Thu Mar 11 23:39:17 2010
@@ -5,7 +5,7 @@
{% if show_everything %}
<span>Total samples: {{result.total_samples}}</span>
<span>Unique keys: {{result.unique_samples}}</span>
- <span>Overall rate: {{result.overall_rate|floatformat:"-4"}}</span>
+ <span>Overall rate: {{result.overall_rate|floatformat:"-4"}}/sec</span>
{% endif %}
</div>
{% if result.unique_samples %}