Revision: 407
Author:   bslatkin
Date:     Tue Jul 26 09:40:18 2011
Log: Adds subscription counting mapreduce; includes counts in fetch requests; updates all tests to run under python2.6 local environment
http://code.google.com/p/pubsubhubbub/source/detail?r=407

Added:
 /trunk/hub/run_tests.sh
Deleted:
 /trunk/hub/event_test.html
Modified:
 /trunk/hub/app.yaml
 /trunk/hub/dos.py
 /trunk/hub/fork_join_queue.py
 /trunk/hub/fork_join_queue_test.py
 /trunk/hub/main.py
 /trunk/hub/main_test.py
 /trunk/hub/mapreduce.yaml
 /trunk/hub/offline_jobs.py
 /trunk/hub/offline_jobs_test.py
 /trunk/hub/topic_details.html

=======================================
--- /dev/null
+++ /trunk/hub/run_tests.sh     Tue Jul 26 09:40:18 2011
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+for test_file in $(ls *_test.py ../nonstandard/*_test.py)
+do
+  echo -e "========== Running $test_file"
+  ./$test_file
+  if [ "$?" -ne "0" ]; then
+    echo "Died in $test_file"
+    exit 1
+  fi
+done
=======================================
--- /trunk/hub/event_test.html  Fri Oct  3 00:27:55 2008
+++ /dev/null
@@ -1,28 +0,0 @@
-<html>
-<head>
-  <title>Event test</title>
-</head>
-<body>
-
-<h1>Event test</h1>
-
-<form action="/event_test" method="post">
-  <p>
-    <div><label for="topic_url">Topic URL:</label></div>
- <input type="text" name="topic_url" id="topic_url" value="{{topic_url}}" size="50"/>
-  </p>
-  <p>
- <div><label for="callback_urls">Callback URLs (one per line):</topic></div> - <textarea name="callback_urls" id="callback_urls" rows="8" cols="50">{{callback_urls}}</textarea>
-  </p>
-  <p><input type="submit" value="Simulate"></p>
-</form>
-
-{% if info %}
-<pre>
-{{info}}
-</pre>
-{% endif %}
-
-</body>
-</html>
=======================================
--- /trunk/hub/app.yaml Sat Jun  5 18:18:50 2010
+++ /trunk/hub/app.yaml Tue Jul 26 09:40:18 2011
@@ -27,11 +27,24 @@
   upload: favicon\.ico
   secure: optional

+# Admin tools
 - url: /remote_api
   script: $PYTHON_LIB/google/appengine/ext/remote_api/handler.py
   login: admin
   secure: optional

+- url: /admin(/.*)?
+  script: $PYTHON_LIB/google/appengine/ext/admin
+  login: admin
+
+- url: /stats
+  script: main.py
+  login: admin
+
+- url: /mapreduce(/.*)?
+  script: mapreduce/main.py
+  login: admin
+
 # Optional bookmarklet creation gadget.
- url: /bookmarklet(_jsonp\.min\.js|\.min\.js|\.html|_config\.html| _gadget\.xml)
   static_files: bookmarklet/bookmarklet\1
@@ -43,16 +56,7 @@
   script: main.py
   secure: always

-# Global stats are admin only.
-- url: /stats
-  script: main.py
-  login: admin
-
-# Mapreduce for running offline jobs.
-- url: /mapreduce(/.*)?
-  script: mapreduce/main.py
-  login: admin
-
+# Everything else
 - url: .*
   script: main.py
   secure: optional
=======================================
--- /trunk/hub/dos.py   Thu Mar 11 23:39:17 2010
+++ /trunk/hub/dos.py   Tue Jul 26 09:40:18 2011
@@ -274,8 +274,8 @@

   failed_keys = set(k for k, v in second_results.iteritems() if v is None)
   if failed_keys:
- logging.critical('Failed memcache offset_or_add for prefix=%r, keys=%r',
-                     prefix, failed_keys)
+    logging.warning('Failed memcache offset_or_add for prefix=%r, keys=%r',
+                    prefix, failed_keys)

   results.update(second_results)
   return results
=======================================
--- /trunk/hub/fork_join_queue.py       Sun Jul 11 22:57:08 2010
+++ /trunk/hub/fork_join_queue.py       Tue Jul 26 09:40:18 2011
@@ -90,7 +90,7 @@

 from google.net.proto import ProtocolBuffer
 from google.appengine.api import memcache
-from google.appengine.api.labs import taskqueue
+from google.appengine.api import taskqueue
 from google.appengine.ext import db

 # TODO: Consider using multiple work indexes to alleviate the memcache
@@ -388,7 +388,8 @@
             params={'cursor': cursor}
           ).add(self.get_queue_name(index))
           break
- except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
+        except (taskqueue.TaskAlreadyExistsError,
+                taskqueue.TombstonedTaskError):
           # This means the continuation chain already started and this root
           # task failed for some reason; no problem.
           break
=======================================
--- /trunk/hub/fork_join_queue_test.py  Sun Jul 11 22:57:08 2010
+++ /trunk/hub/fork_join_queue_test.py  Tue Jul 26 09:40:18 2011
@@ -293,7 +293,10 @@
     work_index = TEST_QUEUE.next_index()
     tasks = []
     for i in xrange(6):
-      tasks.append(TestModel(work_index=work_index, number=i))
+ # Simplify tests by assigning the key names of the TestModel, making it
+      # so the values returned by pop_request() below are predictable.
+      key = db.Key.from_path(TestModel.kind(), i+1)
+      tasks.append(TestModel(key=key, work_index=work_index, number=i))
     db.put(tasks)
     TEST_QUEUE.add(work_index, gettime=self.gettime1)

@@ -397,13 +400,15 @@
     """Tests adding and popping from a sharded queue with continuation."""
     from google.appengine.api import apiproxy_stub_map
     stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
-    old_valid = stub._IsValidQueue
-    stub._IsValidQueue = lambda *a, **k: True
+    stub._queues[None]._all_queues_valid = True
     try:
       work_index = SHARDED_QUEUE.next_index()
       tasks = []
       for i in xrange(5):
-        tasks.append(TestModel(work_index=work_index, number=i))
+ # Simplify tests by assigning the key names of the TestModel, making it
+        # so the values returned by pop_request() below are predictable.
+        key = db.Key.from_path(TestModel.kind(), i+1)
+        tasks.append(TestModel(key=key, work_index=work_index, number=i))
       db.put(tasks)
       SHARDED_QUEUE.add(work_index, gettime=self.gettime1)
       queue_name = 'default-%d' % (1 + (work_index % 4))
@@ -427,7 +432,7 @@
       self.assertTrue('cursor' in next_task['params'])
       self.assertTrue(next_task['name'].endswith('-1'))
     finally:
-      stub._IsValidQueue = old_valid
+      stub._queues[None]._all_queues_valid = False

   def testMemcacheQueue(self):
"""Tests adding and popping from an in-memory queue with continuation."""
=======================================
--- /trunk/hub/main.py  Mon Nov 15 15:47:16 2010
+++ /trunk/hub/main.py  Tue Jul 26 09:40:18 2011
@@ -35,6 +35,9 @@
   conjunction with KnownFeed to properly canonicalize feed aliases on
   subscription and pinging.

+* KnownFeedStats: Statistics about a topic URL. Used to provide subscriber
+  counts to publishers on feed fetch.
+
* FeedRecord: Metadata information about a feed, the last time it was polled, and any headers that may affect future polling. Also contains any debugging
   information about the last feed fetch and why it may have failed.
@@ -75,11 +78,6 @@

 # Bigger TODOs (in priority order)
 #
-# - Add subscription counting to PushEventHandler so we can deliver a header
-#   with the number of subscribers the feed has. This will simply just keep
-# count of the subscribers seen so far and then when the pushing is done it
-#   will save that total back on the FeedRecord instance.
-#
 # - Improve polling algorithm to keep stats on each feed.
 #
 # - Do not poll a feed if we've gotten an event from the publisher in less
@@ -105,8 +103,8 @@
 from google.appengine.api import memcache
 from google.appengine.api import urlfetch
 from google.appengine.api import urlfetch_errors
+from google.appengine.api import taskqueue
 from google.appengine.api import users
-from google.appengine.api.labs import taskqueue
 from google.appengine.ext import db
 from google.appengine.ext import webapp
 from google.appengine.ext.webapp import template
@@ -624,6 +622,9 @@

 def sha1_hmac(secret, data):
   """Returns the sha1 hmac for a chunk of data and a secret."""
+  # For Python 2.6, which can only compute hmacs on non-unicode data.
+  secret = utf8encoded(secret)
+  data = utf8encoded(data)
   return hmac.new(secret, data, hashlib.sha1).hexdigest()


@@ -1031,7 +1032,7 @@
     """
     def txn():
       if self.confirm_failures >= max_failures:
-        logging.warning('Max subscription failures exceeded, giving up.')
+        logging.debug('Max subscription failures exceeded, giving up.')
         return False
       else:
         retry_delay = retry_period * (2 ** self.confirm_failures)
@@ -1146,11 +1147,12 @@
     """
     def txn():
       if self.fetching_failures >= max_failures:
-        logging.warning('Max fetching failures exceeded, giving up.')
+        logging.debug('Max fetching failures exceeded, giving up.')
         self.totally_failed = True
       else:
         retry_delay = retry_period * (2 ** self.fetching_failures)
- logging.warning('Fetching failed. Will retry in %s seconds', retry_delay)
+        logging.debug('Fetching failed. Will retry in %s seconds',
+                      retry_delay)
         self.eta = now() + datetime.timedelta(seconds=retry_delay)
         self.fetching_failures += 1
         self._enqueue_retry_task()
@@ -1317,9 +1319,12 @@
     if header_footer is not None and self.format != ARBITRARY:
       self.header_footer = header_footer

-  def get_request_headers(self):
+  def get_request_headers(self, subscriber_count):
     """Returns the request headers that should be used to pull this feed.

+    Args:
+      subscriber_count: The number of subscribers this feed has.
+
     Returns:
       Dictionary of request header values.
     """
@@ -1331,6 +1336,10 @@
       headers['If-Modified-Since'] = self.last_modified
     if self.etag:
       headers['If-None-Match'] = self.etag
+    if subscriber_count:
+      headers['User-Agent'] = (
+          'Public Hub (+http://pubsubhubbub.appspot.com; %d subscribers)' %
+          subscriber_count)
     return headers


@@ -1619,15 +1628,15 @@
         self.totally_failed = True

       if self.delivery_mode == EventToDeliver.NORMAL:
-        logging.warning('Normal delivery done; %d broken callbacks remain',
-                        len(self.failed_callbacks))
+        logging.debug('Normal delivery done; %d broken callbacks remain',
+                      len(self.failed_callbacks))
         self.delivery_mode = EventToDeliver.RETRY
       else:
-        logging.warning('End of attempt %d; topic = %s, subscribers = %d, '
-                        'waiting until %s or totally_failed = %s',
-                        self.retry_attempts, self.topic,
-                        len(self.failed_callbacks), self.last_modified,
-                        self.totally_failed)
+        logging.debug('End of attempt %d; topic = %s, subscribers = %d, '
+                      'waiting until %s or totally_failed = %s',
+                      self.retry_attempts, self.topic,
+                      len(self.failed_callbacks), self.last_modified,
+                      self.totally_failed)

     def txn():
       self.put()
@@ -1738,6 +1747,57 @@
     return result


+class KnownFeedStats(db.Model):
+  """Represents stats about a feed we know that exists.
+
+  Parent is the KnownFeed entity for a given topic URL.
+  """
+
+  subscriber_count = db.IntegerProperty()
+  update_time = db.DateTimeProperty(auto_now=True)
+
+  @classmethod
+  def create_key(cls, topic_url=None, topic_hash=None):
+    """Creates a key for a KnownFeedStats instance.
+
+    Args:
+      topic_url: The topic URL to create the key for.
+      topic_hash: The hash of the topic URL to create the key for. May only
+        be supplied if topic_url is None.
+
+    Returns:
+      db.Key of the KnownFeedStats instance.
+    """
+    if topic_url and topic_hash:
+      raise TypeError('Must specify topic_url or topic_hash.')
+    if topic_url:
+      topic_hash = sha1_hash(topic_url)
+
+    return db.Key.from_path(KnownFeed.kind(), topic_hash,
+                            cls.kind(), 'overall')
+
+  @classmethod
+  def get_or_create_all(cls, topic_list):
+ """Retrieves and/or creates KnownFeedStats entities for the supplied topics.
+
+    Args:
+      topic_list: List of topics to retrieve.
+
+    Returns:
+      The list of KnownFeedStats corresponding to the input topic list in
+      the same order they were supplied.
+    """
+    key_list = [cls.create_key(t) for t in topic_list]
+    found_list = db.get(key_list)
+    results = []
+    for topic, key, found in zip(topic_list, key_list, found_list):
+      if found:
+        results.append(found)
+      else:
+        results.append(cls(key=key, subscriber_count=0))
+    return results
+
+
 class PollingMarker(db.Model):
"""Keeps track of the current position in the bootstrap polling process."""

@@ -1966,9 +2026,9 @@
                               deadline=MAX_FETCH_SECONDS)
   except urlfetch_errors.Error:
     error_traceback = traceback.format_exc()
-    logging.warning('Error encountered while confirming subscription '
-                    'to %s for callback %s:\n%s',
-                    topic, callback, error_traceback)
+    logging.debug('Error encountered while confirming subscription '
+                  'to %s for callback %s:\n%s',
+                  topic, callback, error_traceback)
     return False

   if 200 <= response.status_code < 300 and response.content == challenge:
@@ -1989,9 +2049,9 @@
                  'topic = %s; subscription archived', callback, topic)
     return True
   else:
-    logging.warning('Could not confirm subscription; encountered '
-                    'status %d with content: %s', response.status_code,
-                    response.content)
+    logging.debug('Could not confirm subscription; encountered '
+                  'status %d with content: %s', response.status_code,
+                  response.content)
     return False


@@ -2086,8 +2146,9 @@
         return self.response.set_status(202)

     except (apiproxy_errors.Error, db.Error,
-            runtime.DeadlineExceededError, taskqueue.Error):
-      logging.exception('Could not verify subscription request')
+            runtime.DeadlineExceededError, taskqueue.Error), e:
+      logging.debug('Could not verify subscription request. %s: %s',
+                    e.__class__.__name__, e)
       self.response.headers['Retry-After'] = '120'
       return self.response.set_status(503)

@@ -2161,8 +2222,8 @@
     self.start_map(
         name='Reconfirm expiring subscriptions',
         handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
-        reader_spec='offline_jobs.HashKeyDatastoreInputReader',
-        reader_parameters=dict(
+        reader_spec='mapreduce.input_readers.DatastoreInputReader',
+        mapper_parameters=dict(
             processing_rate=100000,
             entity_kind='main.Subscription',
             threshold_timestamp=int(
@@ -2623,29 +2684,30 @@
     if not ready_feed_list:
       return

-    feed_record_list = FeedRecord.get_or_create_all(
-        [f.topic for f in ready_feed_list])
+    topic_list = [f.topic for f in ready_feed_list]
+    feed_record_list = FeedRecord.get_or_create_all(topic_list)
+    feed_stats_list = KnownFeedStats.get_or_create_all(topic_list)
     start_time = time.time()
     reporter = dos.Reporter()
     successful_topics = []
     failed_topics = []

-    def create_callback(feed_record, work, fetch_url, attempts):
+ def create_callback(feed_record, feed_stats, work, fetch_url, attempts):
       return lambda *args: callback(
-          feed_record, work, fetch_url, attempts, *args)
-
-    def callback(feed_record, work, fetch_url, attempts,
+          feed_record, feed_stats, work, fetch_url, attempts, *args)
+
+    def callback(feed_record, feed_stats, work, fetch_url, attempts,
                  status_code, headers, content, exception):
       should_parse = False
       fetch_success = False
       if exception:
         if isinstance(exception, urlfetch.ResponseTooLargeError):
- logging.critical('Feed response too large for topic %r at url %r; '
-                           'skipping', work.topic, fetch_url)
+ logging.warning('Feed response too large for topic %r at url %r; '
+                          'skipping', work.topic, fetch_url)
           work.done()
         elif isinstance(exception, urlfetch.InvalidURLError):
-          logging.critical('Invalid redirection for topic %r to url %r; '
-                           'skipping', work.topic, fetch_url)
+          logging.warning('Invalid redirection for topic %r to url %r; '
+                          'skipping', work.topic, fetch_url)
           work.done()
elif isinstance(exception, (apiproxy_errors.Error, urlfetch.Error)):
           logging.warning('Failed to fetch topic %r at url %r. %s: %s',
@@ -2663,13 +2725,17 @@
           logging.debug('Feed publisher for topic %r returned %d '
'redirect to %r', work.topic, status_code, fetch_url)
           if attempts >= MAX_REDIRECTS:
-            logging.warning('Too many redirects!')
+            logging.warning('Too many redirects for topic %r', work.topic)
             work.fetch_failed()
           else:
             # Recurse to do the refetch.
             hooks.execute(pull_feed_async,
- work, fetch_url, feed_record.get_request_headers(), async_proxy, - create_callback(feed_record, work, fetch_url, attempts + 1))
+                work,
+                fetch_url,
+ feed_record.get_request_headers(feed_stats.subscriber_count),
+                async_proxy,
+                create_callback(feed_record, feed_stats, work, fetch_url,
+                                attempts + 1))
             return
         elif status_code == 304:
           logging.debug('Feed publisher for topic %r returned '
@@ -2677,9 +2743,9 @@
           work.done()
           fetch_success = True
         else:
-          logging.warning('Received bad response for topic = %r, '
-                          'status_code = %s, response_headers = %r',
-                          work.topic, status_code, headers)
+          logging.debug('Received bad response for topic = %r, '
+                        'status_code = %s, response_headers = %r',
+                        work.topic, status_code, headers)
           work.fetch_failed()

       # Fetch is done one way or another.
@@ -2700,10 +2766,14 @@
       # End callback

     # Fire off a fetch for every work item and wait for all callbacks.
-    for work, feed_record in zip(ready_feed_list, feed_record_list):
+    for work, feed_record, feed_stats in zip(
+        ready_feed_list, feed_record_list, feed_stats_list):
       hooks.execute(pull_feed_async,
-          work, work.topic, feed_record.get_request_headers(), async_proxy,
-          create_callback(feed_record, work, work.topic, 1))
+          work,
+          work.topic,
+          feed_record.get_request_headers(feed_stats.subscriber_count),
+          async_proxy,
+          create_callback(feed_record, feed_stats, work, work.topic, 1))

     try:
       async_proxy.wait()
@@ -2782,10 +2852,10 @@
       end_time = time.time()
       latency = int((end_time - start_time) * 1000)
       if exception or not (200 <= result.status_code <= 299):
-        logging.warning('Could not deliver to target url %s: '
-                        'Exception = %r, status_code = %s',
-                        sub.callback, exception,
-                        getattr(result, 'status_code', 'unknown'))
+        logging.debug('Could not deliver to target url %s: '
+                      'Exception = %r, status_code = %s',
+                      sub.callback, exception,
+                      getattr(result, 'status_code', 'unknown'))
         report_delivery(reporter, sub.callback, False, latency)
       else:
         failed_callbacks.remove(sub)
@@ -2984,8 +3054,9 @@

     try:
       response = urlfetch.fetch(topic)
-    except (apiproxy_errors.Error, urlfetch.Error):
-      logging.exception('Could not fetch topic = %s for feed ID', topic)
+    except (apiproxy_errors.Error, urlfetch.Error), e:
+      logging.warning('Could not fetch topic = %s for feed ID. %s: %s',
+                      topic, e.__class__.__name__, e)
       known_feed.put()
       return

@@ -3098,6 +3169,14 @@
             FETCH_URL_SAMPLE_DAY_LATENCY,
             single_key=topic_url),
       }
+
+      if users.is_current_user_admin():
+        feed_stats = db.get(KnownFeedStats.create_key(topic_url=topic_url))
+        if feed_stats:
+          context.update({
+            'subscriber_count': feed_stats.subscriber_count,
+            'feed_stats_update_time': feed_stats.update_time,
+          })

       fetch = FeedToFetch.get_by_topic(topic_url)
       if fetch:
=======================================
--- /trunk/hub/main_test.py     Mon Nov 15 15:47:16 2010
+++ /trunk/hub/main_test.py     Tue Jul 26 09:40:18 2011
@@ -1504,7 +1504,7 @@
################################################################################

 FeedRecord = main.FeedRecord
-
+KnownFeedStats = main.KnownFeedStats

 class PullFeedHandlerTest(testutil.HandlerTestBase):

@@ -1725,6 +1725,39 @@
     self.assertTrue(EventToDeliver.all().get() is None)
     testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)

+    self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
+
+  def testStatsUserAgent(self):
+    """Tests that the user agent string includes feed stats."""
+    info = FeedRecord.get_or_create(self.topic)
+    info.update(self.headers)
+    info.put()
+
+    KnownFeedStats(
+      key=KnownFeedStats.create_key(self.topic),
+      subscriber_count=123).put()
+
+    request_headers = {
+      'User-Agent':
+          'Public Hub (+http://pubsubhubbub.appspot.com; 123 subscribers)',
+    }
+
+    FeedToFetch.insert([self.topic])
+    self.entry_list = []
+    urlfetch_test_stub.instance.expect(
+        'get', self.topic, 200, self.expected_response,
+        request_headers=request_headers,
+        response_headers=self.headers)
+    self.run_fetch_task()
+    self.assertTrue(EventToDeliver.all().get() is None)
+    testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
+
+    record = FeedRecord.get_or_create(self.topic)
+    self.assertEquals(self.header_footer, record.header_footer)
+    self.assertEquals(self.etag, record.etag)
+    self.assertEquals(self.last_modified, record.last_modified)
+    self.assertEquals('application/atom+xml', record.content_type)
+
     self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))

   def testNoNewEntries(self):
@@ -2193,7 +2226,7 @@
     self.assertEquals(
         {'Connection': 'cache-control',
          'Cache-Control': 'no-cache no-store max-age=1'},
-        FeedRecord.all().get().get_request_headers())
+        FeedRecord.all().get().get_request_headers(0))

   def testPullGoodRss(self):
     """Tests when the RSS XML can parse just fine."""
@@ -3719,13 +3752,13 @@
     self.now = time.time()
     self.called = False
     def start_map(*args, **kwargs):
-      self.assertEquals(kwargs, {
+      self.assertEquals({
           'name': 'Reconfirm expiring subscriptions',
-          'reader_spec': 'offline_jobs.HashKeyDatastoreInputReader',
+          'reader_spec': 'mapreduce.input_readers.DatastoreInputReader',
           'queue_name': 'polling',
           'handler_spec': 'offline_jobs.SubscriptionReconfirmMapper.run',
           'shard_count': 4,
-          'reader_parameters': {
+          'mapper_parameters': {
             'entity_kind': 'main.Subscription',
             'processing_rate': 100000,
             'threshold_timestamp':
@@ -3735,7 +3768,7 @@
             'done_callback': '/work/cleanup_mapper',
             'done_callback_queue': 'polling',
           },
-      })
+      }, kwargs)
       self.called = True

     def create_handler():
@@ -3801,7 +3834,7 @@
         name='Reconfirm expiring subscriptions',
         handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
         reader_spec='mapreduce.input_readers.DatastoreInputReader',
-        reader_parameters=dict(
+        mapper_parameters=dict(
             processing_rate=100000,
             entity_kind='main.Subscription'))

=======================================
--- /trunk/hub/mapreduce.yaml   Tue Nov 16 13:41:28 2010
+++ /trunk/hub/mapreduce.yaml   Tue Jul 26 09:40:18 2011
@@ -1,15 +1,4 @@
 mapreduce:
-- name: Remove old properties from FeedEntryRecords
-  mapper:
-    input_reader: mapreduce.input_readers.DatastoreInputReader
-    handler: offline_jobs.RemoveOldFeedEntryRecordPropertiesMapper
-    params:
-    - name: entity_kind
-      default: main.FeedEntryRecord
-    - name: shard_count
-      default: 32
-    - name: processing_rate
-      default: 100000
 - name: Cleanup old EventToDeliver instances
   mapper:
     input_reader: mapreduce.input_readers.DatastoreInputReader
@@ -26,7 +15,7 @@
     params_validator: offline_jobs.CleanupOldEventToDeliver.validate_params
 - name: Reconfirm expiring subscriptions
   mapper:
-    input_reader: offline_jobs.HashKeyDatastoreInputReader
+    input_reader: mapreduce.input_readers.DatastoreInputReader
     handler: offline_jobs.SubscriptionReconfirmMapper.run
     params:
     - name: entity_kind
@@ -39,7 +28,7 @@
params_validator: offline_jobs.SubscriptionReconfirmMapper.validate_params
 - name: Count subscribers by topic and callback pattern
   mapper:
-    input_reader: offline_jobs.HashKeyDatastoreInputReader
+    input_reader: mapreduce.input_readers.DatastoreInputReader
     handler: offline_jobs.CountSubscribers.run
     params:
     - name: entity_kind
=======================================
--- /trunk/hub/offline_jobs.py  Tue Nov 16 12:40:57 2010
+++ /trunk/hub/offline_jobs.py  Tue Jul 26 09:40:18 2011
@@ -29,20 +29,9 @@

 from mapreduce import context
 from mapreduce import input_readers
+from mapreduce import mapreduce_pipeline
 from mapreduce import operation as op
 from mapreduce import util
-from mapreduce.lib import key_range
-
-
-def RemoveOldFeedEntryRecordPropertiesMapper(feed_entry_record):
-  """Removes old properties from FeedEntryRecord instances."""
-  OLD_PROPERTIES = (
-      'entry_id_hash',
-      'entry_id')
-  for name in OLD_PROPERTIES:
-    if hasattr(feed_entry_record, name):
-      delattr(feed_entry_record, name)
-  yield op.db.Put(feed_entry_record)


 class CleanupOldEventToDeliver(object):
@@ -68,7 +57,7 @@


 class CountSubscribers(object):
-  """Mapper counts subscribers to a feed pattern by domain.
+  """Mapper counts active subscribers to a feed pattern by domain.

   Args:
     topic_pattern: Fully-matching regular expression pattern for topics to
@@ -96,60 +85,11 @@

     if self.topic_pattern.match(subscription.topic):
       the_match = self.callback_pattern.match(subscription.callback)
-      if the_match:
+      if (the_match and
+ subscription.subscription_state == main.Subscription.STATE_VERIFIED):
         yield op.counters.Increment(the_match.group(1))
-
-
-class HashKeyDatastoreInputReader(input_readers.DatastoreInputReader):
-  """A DatastoreInputReader that can split evenly across hash key ranges.
-
- Assumes key names are in the format supplied by the main.get_hash_key_name
-  function.
-  """
-
-  @classmethod
-  def _split_input_from_namespace(
-      cls, app, namespace, entity_kind_name, shard_count):
-    entity_kind = util.for_name(entity_kind_name)
-    entity_kind_name = entity_kind.kind()
-
-    hex_key_start = db.Key.from_path(
-        entity_kind_name, 0)
-    hex_key_end = db.Key.from_path(
-        entity_kind_name, int('f' * 40, base=16))
-    hex_range = key_range.KeyRange(
-        hex_key_start, hex_key_end, None, True, True,
-        namespace=namespace,
-        _app=app)
-
-    key_range_list = [hex_range]
-    number_of_half_splits = int(math.floor(math.log(shard_count, 2)))
-    for index in xrange(0, number_of_half_splits):
-      new_ranges = []
-      for current_range in key_range_list:
-        new_ranges.extend(current_range.split_range(1))
-      key_range_list = new_ranges
-
-    adjusted_range_list = []
-    for current_range in key_range_list:
-      adjusted_range = key_range.KeyRange(
-          key_start=db.Key.from_path(
-              current_range.key_start.kind(),
-              'hash_%040x' % (current_range.key_start.id() or 0),
-              _app=current_range._app),
-          key_end=db.Key.from_path(
-              current_range.key_end.kind(),
-              'hash_%040x' % (current_range.key_end.id() or 0),
-              _app=current_range._app),
-          direction=current_range.direction,
-          include_start=current_range.include_start,
-          include_end=current_range.include_end,
-          namespace=current_range.namespace,
-          _app=current_range._app)
-
-      adjusted_range_list.append(adjusted_range)
-
-    return adjusted_range_list
+      elif the_match:
+        yield op.counters.Increment('matched but inactive')


 class SubscriptionReconfirmMapper(object):
@@ -174,3 +114,34 @@
     if sub.expiration_time < self.threshold_timestamp:
       sub.request_insert(sub.callback, sub.topic, sub.verify_token,
                          sub.secret, auto_reconfirm=True)
+
+
+def count_subscriptions_for_topic(subscription):
+  """Counts a Subscription instance if it's still active."""
+  print subscription.subscription_state
+  if subscription.subscription_state == main.Subscription.STATE_VERIFIED:
+    yield (subscription.topic_hash, '1')
+
+
+def save_subscription_counts_for_topic(topic_hash, counts):
+ """Sums subscriptions to a topic and saves a corresponding KnownFeedStat."""
+  total_count = len(counts)
+  entity = main.KnownFeedStats(
+      key=main.KnownFeedStats.create_key(topic_hash=topic_hash),
+      subscriber_count=total_count)
+  yield op.db.Put(entity)
+
+
+def start_count_subscriptions():
+ """Kicks off the MapReduce for determining and saving subscription counts."""
+  job = mapreduce_pipeline.MapreducePipeline(
+      'Count subscriptions',
+      'offline_jobs.count_subscriptions_for_topic',
+      'offline_jobs.save_subscription_counts_for_topic',
+      'mapreduce.input_readers.DatastoreInputReader',
+      mapper_params=dict(entity_kind='main.Subscription'),
+      shards=4)
+  # TODO(bslatkin): Pass through the queue name to run the job on. This is
+  # a limitation in the mapper library.
+  job.start()
+  return job.pipeline_id
=======================================
--- /trunk/hub/offline_jobs_test.py     Tue Nov 16 12:40:57 2010
+++ /trunk/hub/offline_jobs_test.py     Tue Jul 26 09:40:18 2011
@@ -37,145 +37,6 @@

################################################################################

-class HashKeyDatastoreInputReaderTest(unittest.TestCase):
-  """Tests for the HashKeyDatastoreInputReader."""
-
-  def setUp(self):
-    """Sets up the test harness."""
-    testutil.setup_for_testing()
-    self.app = 'my-app-id'
-    self.entity_kind = 'main.Subscription'
-    self.namespace = 'my-namespace'
-
-  def testOneShard(self):
-    """Tests just one shard."""
-    result = (
- offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
-          self.app, self.namespace, self.entity_kind, 1))
-
-    expected = [
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_0000000000000000000000000000000000000000',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_ffffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='ASC',
-          include_start=True,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id')
-    ]
-    self.assertEquals(expected, result)
-
-  def testTwoShards(self):
- """Tests two shares: one for number prefixes, one for letter prefixes."""
-    result = (
- offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
-          self.app, self.namespace, self.entity_kind, 2))
-
-    expected = [
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_0000000000000000000000000000000000000000',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_7fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='DESC',
-          include_start=True,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_7fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_ffffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='ASC',
-          include_start=False,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-    ]
-    self.assertEquals(expected, result)
-
-  def testManyShards(self):
-    """Tests having many shards with multiple levels of splits."""
-    result = (
- offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
-          self.app, self.namespace, self.entity_kind, 4))
-
-    expected = [
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_0000000000000000000000000000000000000000',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_3fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='DESC',
-          include_start=True,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_3fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_7fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='ASC',
-          include_start=False,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_7fffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_bfffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='DESC',
-          include_start=False,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-      key_range.KeyRange(
-          key_start=db.Key.from_path(
-              'Subscription',
-              u'hash_bfffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          key_end=db.Key.from_path(
-              'Subscription',
-              u'hash_ffffffffffffffffffffffffffffffffffffffff',
-              _app=u'my-app-id'),
-          direction='ASC',
-          include_start=False,
-          include_end=True,
-          namespace='my-namespace',
-          _app='my-app-id'),
-    ]
-    self.assertEquals(expected, result)
-
-
 Subscription = main.Subscription


@@ -341,6 +202,17 @@
     self.assertEquals(1, counter.delta)
     self.assertRaises(StopIteration, gen.next)

+  def testTopicMatch_CallbackMatch_Inactive(self):
+    """Tests when the subscription matches but is inactive."""
+    sub = self.get_subscription()
+    sub.subscription_state = Subscription.STATE_NOT_VERIFIED
+    sub.put()
+    gen = self.mapper.run(sub)
+    counter = gen.next()
+    self.assertEquals('matched but inactive', counter.counter_name)
+    self.assertEquals(1, counter.delta)
+    self.assertRaises(StopIteration, gen.next)
+
   def testTopicMatch_CallbackNoMatch(self):
     """Tests when the topic matches but the callback does not."""
     self.callback = 'some garbage'
@@ -355,6 +227,60 @@
     gen = self.mapper.run(sub)
     self.assertRaises(StopIteration, gen.next)

+################################################################################
+
+class SaveSubscriptionCountsTest(unittest.TestCase):
+  """Tests for the MapReduce that saves subscription counts."""
+
+  def setUp(self):
+    """Sets up the test harness."""
+    testutil.setup_for_testing()
+    self.callback = 'http://foo.callback-example.com/my-callback-url'
+    self.topic = 'http://example.com/my-topic-url'
+    self.token = 'token'
+    self.secret = 'my secrat'
+
+  def testMap(self):
+    """Tests the mapper function."""
+    self.assertTrue(Subscription.insert(
+        self.callback, self.topic, self.token, self.secret))
+    sub = Subscription.get_by_key_name(
+        Subscription.create_key_name(self.callback, self.topic))
+
+    # Active subscription.
+    it = offline_jobs.count_subscriptions_for_topic(sub)
+    self.assertEquals(('95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2', '1'),
+                      it.next())
+    self.assertRaises(StopIteration, it.next)
+
+    # Not active
+    Subscription.archive(self.callback, self.topic)
+    sub = db.get(sub.key())
+    it = offline_jobs.count_subscriptions_for_topic(sub)
+    self.assertRaises(StopIteration, it.next)
+
+  def testReduce(self):
+    """Tests the reducer function."""
+    self.assertEquals(0, len(list(main.KnownFeedStats.all())))
+    it = offline_jobs.save_subscription_counts_for_topic(
+        '95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2',
+        ['1'] * 321)
+    op = it.next()
+    self.assertEquals(
+        db.Key.from_path(
+            'KnownFeed', '95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2',
+            'KnownFeedStats', 'overall'),
+        op.entity.key())
+    self.assertEquals(321, op.entity.subscriber_count)
+    self.assertRaises(StopIteration, it.next)
+
+  def testStart(self):
+    """Tests starting the mapreduce job."""
+    job_id = offline_jobs.start_count_subscriptions()
+    self.assertTrue(job_id is not None)
+    task = testutil.get_tasks('default', expected_count=1, index=0)
+    self.assertEquals('/mapreduce/pipeline/run', task['url'])
+
################################################################################

 if __name__ == '__main__':
=======================================
--- /trunk/hub/topic_details.html       Mon Mar 15 23:00:57 2010
+++ /trunk/hub/topic_details.html       Tue Jul 26 09:40:18 2011
@@ -42,6 +42,16 @@
     <td>Last Modified:</td>
     <td>{{last_modified|escape}}</td>
   </tr>
+  {% if subscriber_count %}
+  <tr>
+    <td>Active subscribers:</td>
+    <td>{{subscriber_count}}</td>
+  </tr>
+  <tr>
+    <td>Last count (UTC):</td>
+    <td>{{feed_stats_update_time|date:"Y-m-d\TH:i:s\Z"}}</td>
+  </tr>
+  {% endif %}
   <tr>
     <td>Fetch from domain:</td>
     <td>

Reply via email to