Revision: 400
Author: bslatkin
Date: Tue Nov 16 12:40:57 2010
Log: hub: adds mr for counting subscriptions by topic/callback pattern
http://code.google.com/p/pubsubhubbub/source/detail?r=400

Modified:
 /trunk/hub/mapreduce.yaml
 /trunk/hub/offline_jobs.py
 /trunk/hub/offline_jobs_test.py

=======================================
--- /trunk/hub/mapreduce.yaml   Mon Nov 15 13:38:44 2010
+++ /trunk/hub/mapreduce.yaml   Tue Nov 16 12:40:57 2010
@@ -37,3 +37,19 @@
       default: 100000
     - name: threshold_timestamp
params_validator: offline_jobs.SubscriptionReconfirmMapper.validate_params
+- name: Count subscribers by topic and callback pattern
+    mapper:
+      input_reader: offline_jobs.HashKeyDatastoreInputReader
+      handler: offline_jobs.CountSubscribers.run
+      params:
+      - name: entity_kind
+        default: main.Subscription
+      - name: shard_count
+        default: 128
+      - name: processing_rate
+        default: 1000000
+      - name: topic_pattern
+        default: http(s)?://.*
+      - name: callback_pattern
+        default: http(?:s)?://(?:[^\\.]+\\.)*([^\\./]+\.[^\\./]+)(?:/.*)?
+    params_validator: offline_jobs.CountSubscribers.validate_params
=======================================
--- /trunk/hub/offline_jobs.py  Mon Nov 15 13:38:44 2010
+++ /trunk/hub/offline_jobs.py  Tue Nov 16 12:40:57 2010
@@ -20,6 +20,7 @@
 import datetime
 import logging
 import math
+import re
 import time

 from google.appengine.ext import db
@@ -66,6 +67,39 @@
       yield op.db.Delete(event)


+class CountSubscribers(object):
+  """Mapper counts subscribers to a feed pattern by domain.
+
+  Args:
+    topic_pattern: Fully-matching regular expression pattern for topics to
+      include in the count.
+    callback_pattern: Full-matching regular expression pattern for callback
+ URLs, where the first group is used as the aggregation key for counters.
+  """
+
+  @staticmethod
+  def validate_params(params):
+    topic_pattern = params['topic_pattern']
+    assert topic_pattern and re.compile(topic_pattern)
+    callback_pattern = params['callback_pattern']
+    assert callback_pattern and re.compile(callback_pattern)
+
+  def __init__(self):
+    self.topic_pattern = None
+    self.callback_pattern = None
+
+  def run(self, subscription):
+    if self.topic_pattern is None:
+      params = context.get().mapreduce_spec.mapper.params
+      self.topic_pattern = re.compile(params['topic_pattern'])
+      self.callback_pattern = re.compile(params['callback_pattern'])
+
+    if self.topic_pattern.match(subscription.topic):
+      the_match = self.callback_pattern.match(subscription.callback)
+      if the_match:
+        yield op.counters.Increment(the_match.group(1))
+
+
 class HashKeyDatastoreInputReader(input_readers.DatastoreInputReader):
   """A DatastoreInputReader that can split evenly across hash key ranges.

=======================================
--- /trunk/hub/offline_jobs_test.py     Mon Nov 15 13:38:44 2010
+++ /trunk/hub/offline_jobs_test.py     Tue Nov 16 12:40:57 2010
@@ -20,6 +20,7 @@
 import datetime
 import logging
 logging.basicConfig(format='%(levelname)-8s %(filename)s] %(message)s')
+import re
 import time
 import unittest

@@ -246,6 +247,114 @@
task = testutil.get_tasks(main.POLLING_QUEUE, index=0, expected_count=1)
     self.assertEquals('polling', task['headers']['X-AppEngine-QueueName'])

+################################################################################
+
+class CountSubscribersTest(unittest.TestCase):
+  """Tests for the CountSubscribers job."""
+
+  def setUp(self):
+    """Sets up the test harness."""
+    testutil.setup_for_testing()
+    self.mapper = offline_jobs.CountSubscribers()
+    self.callback = 'http://foo.callback-example.com/my-callback-url'
+    self.topic = 'http://example.com/my-topic-url'
+    self.token = 'token'
+    self.secret = 'my secrat'
+    # Do not make these raw strings on purpose, since they will get
+    # passed through escaped in the mapreduce.yaml.
+    self.topic_pattern = '^http://example\\.com/.*$'
+    self.callback_pattern = (
+        'http(?:s)?://(?:[^\\.]+\\.)*([^\\./]+\.[^\\./]+)(?:/.*)?')
+
+    class FakeMapper(object):
+      params = {
+        'topic_pattern': self.topic_pattern,
+        'callback_pattern': self.callback_pattern,
+      }
+    class FakeSpec(object):
+      mapreduce_id = '1234'
+      mapper = FakeMapper()
+    self.context = context.Context(FakeSpec(), None)
+    context.Context._set(self.context)
+
+  def get_subscription(self):
+    """Returns the Subscription used for testing."""
+    self.assertTrue(Subscription.insert(
+        self.callback, self.topic, self.token, self.secret))
+    return Subscription.get_by_key_name(
+        Subscription.create_key_name(self.callback, self.topic))
+
+  def testExpressions(self):
+    """Tests the default expressions we're going to use for callbacks."""
+    callback_re = re.compile(self.callback_pattern)
+    self.assertEquals(
+        'blah.com',
+        callback_re.match('http://foo.blah.com/stuff').group(1))
+    self.assertEquals(
+        'blah.com',
+        callback_re.match('http://blah.com/stuff').group(1))
+    self.assertEquals(
+        'blah.com',
+        callback_re.match('http://one.two.three.blah.com/stuff').group(1))
+    self.assertEquals(
+        'blah.com',
+        callback_re.match('http://no-ending.blah.com').group(1))
+    self.assertEquals(
+        'example.com',
+        callback_re.match('https://fun.with.https.example.com/').group(1))
+
+  def testValidateParams(self):
+    """Tests the validate_params function."""
+    self.assertRaises(
+        KeyError,
+        offline_jobs.CountSubscribers.validate_params,
+        {})
+    self.assertRaises(
+        AssertionError,
+        offline_jobs.CountSubscribers.validate_params,
+        {'topic_pattern': ''})
+    self.assertRaises(
+        re.error,
+        offline_jobs.CountSubscribers.validate_params,
+        {'topic_pattern': 'this is bad('})
+    self.assertRaises(
+        KeyError,
+        offline_jobs.CountSubscribers.validate_params,
+        {'topic_pattern': 'okay'})
+    self.assertRaises(
+        AssertionError,
+        offline_jobs.CountSubscribers.validate_params,
+        {'topic_pattern': 'okay', 'callback_pattern': ''})
+    self.assertRaises(
+        re.error,
+        offline_jobs.CountSubscribers.validate_params,
+        {'topic_pattern': 'okay', 'callback_pattern': 'this is bad('})
+    offline_jobs.CountSubscribers.validate_params(
+        {'topic_pattern': 'okay', 'callback_pattern': 'and okay'})
+
+  def testTopicMatch_CallbackMatch(self):
+    """Tests when the topic and callbacks match."""
+    sub = self.get_subscription()
+    gen = self.mapper.run(sub)
+    counter = gen.next()
+    self.assertEquals('callback-example.com', counter.counter_name)
+    self.assertEquals(1, counter.delta)
+    self.assertRaises(StopIteration, gen.next)
+
+  def testTopicMatch_CallbackNoMatch(self):
+    """Tests when the topic matches but the callback does not."""
+    self.callback = 'some garbage'
+    sub = self.get_subscription()
+    gen = self.mapper.run(sub)
+    self.assertRaises(StopIteration, gen.next)
+
+  def testTopicNoMatch(self):
+    """Tests when the topic does not match."""
+    self.topic = 'http://does-not-match.com'
+    sub = self.get_subscription()
+    gen = self.mapper.run(sub)
+    self.assertRaises(StopIteration, gen.next)
+
################################################################################

 if __name__ == '__main__':

Reply via email to