Revision: 400
Author: bslatkin
Date: Tue Nov 16 12:40:57 2010
Log: hub: adds mr for counting subscriptions by topic/callback pattern
http://code.google.com/p/pubsubhubbub/source/detail?r=400
Modified:
/trunk/hub/mapreduce.yaml
/trunk/hub/offline_jobs.py
/trunk/hub/offline_jobs_test.py
=======================================
--- /trunk/hub/mapreduce.yaml Mon Nov 15 13:38:44 2010
+++ /trunk/hub/mapreduce.yaml Tue Nov 16 12:40:57 2010
@@ -37,3 +37,19 @@
default: 100000
- name: threshold_timestamp
params_validator:
offline_jobs.SubscriptionReconfirmMapper.validate_params
+- name: Count subscribers by topic and callback pattern
+ mapper:
+ input_reader: offline_jobs.HashKeyDatastoreInputReader
+ handler: offline_jobs.CountSubscribers.run
+ params:
+ - name: entity_kind
+ default: main.Subscription
+ - name: shard_count
+ default: 128
+ - name: processing_rate
+ default: 1000000
+ - name: topic_pattern
+ default: http(s)?://.*
+ - name: callback_pattern
+ default: http(?:s)?://(?:[^\\.]+\\.)*([^\\./]+\.[^\\./]+)(?:/.*)?
+ params_validator: offline_jobs.CountSubscribers.validate_params
=======================================
--- /trunk/hub/offline_jobs.py Mon Nov 15 13:38:44 2010
+++ /trunk/hub/offline_jobs.py Tue Nov 16 12:40:57 2010
@@ -20,6 +20,7 @@
import datetime
import logging
import math
+import re
import time
from google.appengine.ext import db
@@ -66,6 +67,39 @@
yield op.db.Delete(event)
+class CountSubscribers(object):
+ """Mapper counts subscribers to a feed pattern by domain.
+
+ Args:
+ topic_pattern: Fully-matching regular expression pattern for topics to
+ include in the count.
+ callback_pattern: Full-matching regular expression pattern for callback
+ URLs, where the first group is used as the aggregation key for
counters.
+ """
+
+ @staticmethod
+ def validate_params(params):
+ topic_pattern = params['topic_pattern']
+ assert topic_pattern and re.compile(topic_pattern)
+ callback_pattern = params['callback_pattern']
+ assert callback_pattern and re.compile(callback_pattern)
+
+ def __init__(self):
+ self.topic_pattern = None
+ self.callback_pattern = None
+
+ def run(self, subscription):
+ if self.topic_pattern is None:
+ params = context.get().mapreduce_spec.mapper.params
+ self.topic_pattern = re.compile(params['topic_pattern'])
+ self.callback_pattern = re.compile(params['callback_pattern'])
+
+ if self.topic_pattern.match(subscription.topic):
+ the_match = self.callback_pattern.match(subscription.callback)
+ if the_match:
+ yield op.counters.Increment(the_match.group(1))
+
+
class HashKeyDatastoreInputReader(input_readers.DatastoreInputReader):
"""A DatastoreInputReader that can split evenly across hash key ranges.
=======================================
--- /trunk/hub/offline_jobs_test.py Mon Nov 15 13:38:44 2010
+++ /trunk/hub/offline_jobs_test.py Tue Nov 16 12:40:57 2010
@@ -20,6 +20,7 @@
import datetime
import logging
logging.basicConfig(format='%(levelname)-8s %(filename)s] %(message)s')
+import re
import time
import unittest
@@ -246,6 +247,114 @@
task = testutil.get_tasks(main.POLLING_QUEUE, index=0,
expected_count=1)
self.assertEquals('polling', task['headers']['X-AppEngine-QueueName'])
+################################################################################
+
+class CountSubscribersTest(unittest.TestCase):
+ """Tests for the CountSubscribers job."""
+
+ def setUp(self):
+ """Sets up the test harness."""
+ testutil.setup_for_testing()
+ self.mapper = offline_jobs.CountSubscribers()
+ self.callback = 'http://foo.callback-example.com/my-callback-url'
+ self.topic = 'http://example.com/my-topic-url'
+ self.token = 'token'
+ self.secret = 'my secrat'
+ # Do not make these raw strings on purpose, since they will get
+ # passed through escaped in the mapreduce.yaml.
+ self.topic_pattern = '^http://example\\.com/.*$'
+ self.callback_pattern = (
+ 'http(?:s)?://(?:[^\\.]+\\.)*([^\\./]+\.[^\\./]+)(?:/.*)?')
+
+ class FakeMapper(object):
+ params = {
+ 'topic_pattern': self.topic_pattern,
+ 'callback_pattern': self.callback_pattern,
+ }
+ class FakeSpec(object):
+ mapreduce_id = '1234'
+ mapper = FakeMapper()
+ self.context = context.Context(FakeSpec(), None)
+ context.Context._set(self.context)
+
+ def get_subscription(self):
+ """Returns the Subscription used for testing."""
+ self.assertTrue(Subscription.insert(
+ self.callback, self.topic, self.token, self.secret))
+ return Subscription.get_by_key_name(
+ Subscription.create_key_name(self.callback, self.topic))
+
+ def testExpressions(self):
+ """Tests the default expressions we're going to use for callbacks."""
+ callback_re = re.compile(self.callback_pattern)
+ self.assertEquals(
+ 'blah.com',
+ callback_re.match('http://foo.blah.com/stuff').group(1))
+ self.assertEquals(
+ 'blah.com',
+ callback_re.match('http://blah.com/stuff').group(1))
+ self.assertEquals(
+ 'blah.com',
+ callback_re.match('http://one.two.three.blah.com/stuff').group(1))
+ self.assertEquals(
+ 'blah.com',
+ callback_re.match('http://no-ending.blah.com').group(1))
+ self.assertEquals(
+ 'example.com',
+ callback_re.match('https://fun.with.https.example.com/').group(1))
+
+ def testValidateParams(self):
+ """Tests the validate_params function."""
+ self.assertRaises(
+ KeyError,
+ offline_jobs.CountSubscribers.validate_params,
+ {})
+ self.assertRaises(
+ AssertionError,
+ offline_jobs.CountSubscribers.validate_params,
+ {'topic_pattern': ''})
+ self.assertRaises(
+ re.error,
+ offline_jobs.CountSubscribers.validate_params,
+ {'topic_pattern': 'this is bad('})
+ self.assertRaises(
+ KeyError,
+ offline_jobs.CountSubscribers.validate_params,
+ {'topic_pattern': 'okay'})
+ self.assertRaises(
+ AssertionError,
+ offline_jobs.CountSubscribers.validate_params,
+ {'topic_pattern': 'okay', 'callback_pattern': ''})
+ self.assertRaises(
+ re.error,
+ offline_jobs.CountSubscribers.validate_params,
+ {'topic_pattern': 'okay', 'callback_pattern': 'this is bad('})
+ offline_jobs.CountSubscribers.validate_params(
+ {'topic_pattern': 'okay', 'callback_pattern': 'and okay'})
+
+ def testTopicMatch_CallbackMatch(self):
+ """Tests when the topic and callbacks match."""
+ sub = self.get_subscription()
+ gen = self.mapper.run(sub)
+ counter = gen.next()
+ self.assertEquals('callback-example.com', counter.counter_name)
+ self.assertEquals(1, counter.delta)
+ self.assertRaises(StopIteration, gen.next)
+
+ def testTopicMatch_CallbackNoMatch(self):
+ """Tests when the topic matches but the callback does not."""
+ self.callback = 'some garbage'
+ sub = self.get_subscription()
+ gen = self.mapper.run(sub)
+ self.assertRaises(StopIteration, gen.next)
+
+ def testTopicNoMatch(self):
+ """Tests when the topic does not match."""
+ self.topic = 'http://does-not-match.com'
+ sub = self.get_subscription()
+ gen = self.mapper.run(sub)
+ self.assertRaises(StopIteration, gen.next)
+
################################################################################
if __name__ == '__main__':