Revision: 407
Author: bslatkin
Date: Tue Jul 26 09:40:18 2011
Log: Adds subscription counting mapreduce; includes counts in fetch
requests; updates all tests to run under python2.6 local environment
http://code.google.com/p/pubsubhubbub/source/detail?r=407
Added:
/trunk/hub/run_tests.sh
Deleted:
/trunk/hub/event_test.html
Modified:
/trunk/hub/app.yaml
/trunk/hub/dos.py
/trunk/hub/fork_join_queue.py
/trunk/hub/fork_join_queue_test.py
/trunk/hub/main.py
/trunk/hub/main_test.py
/trunk/hub/mapreduce.yaml
/trunk/hub/offline_jobs.py
/trunk/hub/offline_jobs_test.py
/trunk/hub/topic_details.html
=======================================
--- /dev/null
+++ /trunk/hub/run_tests.sh Tue Jul 26 09:40:18 2011
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+for test_file in $(ls *_test.py ../nonstandard/*_test.py)
+do
+ echo -e "========== Running $test_file"
+ ./$test_file
+ if [ "$?" -ne "0" ]; then
+ echo "Died in $test_file"
+ exit 1
+ fi
+done
=======================================
--- /trunk/hub/event_test.html Fri Oct 3 00:27:55 2008
+++ /dev/null
@@ -1,28 +0,0 @@
-<html>
-<head>
- <title>Event test</title>
-</head>
-<body>
-
-<h1>Event test</h1>
-
-<form action="/event_test" method="post">
- <p>
- <div><label for="topic_url">Topic URL:</label></div>
- <input type="text" name="topic_url" id="topic_url"
value="{{topic_url}}" size="50"/>
- </p>
- <p>
- <div><label for="callback_urls">Callback URLs (one per
line):</topic></div>
- <textarea name="callback_urls" id="callback_urls" rows="8"
cols="50">{{callback_urls}}</textarea>
- </p>
- <p><input type="submit" value="Simulate"></p>
-</form>
-
-{% if info %}
-<pre>
-{{info}}
-</pre>
-{% endif %}
-
-</body>
-</html>
=======================================
--- /trunk/hub/app.yaml Sat Jun 5 18:18:50 2010
+++ /trunk/hub/app.yaml Tue Jul 26 09:40:18 2011
@@ -27,11 +27,24 @@
upload: favicon\.ico
secure: optional
+# Admin tools
- url: /remote_api
script: $PYTHON_LIB/google/appengine/ext/remote_api/handler.py
login: admin
secure: optional
+- url: /admin(/.*)?
+ script: $PYTHON_LIB/google/appengine/ext/admin
+ login: admin
+
+- url: /stats
+ script: main.py
+ login: admin
+
+- url: /mapreduce(/.*)?
+ script: mapreduce/main.py
+ login: admin
+
# Optional bookmarklet creation gadget.
- url: /bookmarklet(_jsonp\.min\.js|\.min\.js|\.html|_config\.html|
_gadget\.xml)
static_files: bookmarklet/bookmarklet\1
@@ -43,16 +56,7 @@
script: main.py
secure: always
-# Global stats are admin only.
-- url: /stats
- script: main.py
- login: admin
-
-# Mapreduce for running offline jobs.
-- url: /mapreduce(/.*)?
- script: mapreduce/main.py
- login: admin
-
+# Everything else
- url: .*
script: main.py
secure: optional
=======================================
--- /trunk/hub/dos.py Thu Mar 11 23:39:17 2010
+++ /trunk/hub/dos.py Tue Jul 26 09:40:18 2011
@@ -274,8 +274,8 @@
failed_keys = set(k for k, v in second_results.iteritems() if v is None)
if failed_keys:
- logging.critical('Failed memcache offset_or_add for prefix=%r,
keys=%r',
- prefix, failed_keys)
+ logging.warning('Failed memcache offset_or_add for prefix=%r, keys=%r',
+ prefix, failed_keys)
results.update(second_results)
return results
=======================================
--- /trunk/hub/fork_join_queue.py Sun Jul 11 22:57:08 2010
+++ /trunk/hub/fork_join_queue.py Tue Jul 26 09:40:18 2011
@@ -90,7 +90,7 @@
from google.net.proto import ProtocolBuffer
from google.appengine.api import memcache
-from google.appengine.api.labs import taskqueue
+from google.appengine.api import taskqueue
from google.appengine.ext import db
# TODO: Consider using multiple work indexes to alleviate the memcache
@@ -388,7 +388,8 @@
params={'cursor': cursor}
).add(self.get_queue_name(index))
break
- except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
+ except (taskqueue.TaskAlreadyExistsError,
+ taskqueue.TombstonedTaskError):
# This means the continuation chain already started and this root
# task failed for some reason; no problem.
break
=======================================
--- /trunk/hub/fork_join_queue_test.py Sun Jul 11 22:57:08 2010
+++ /trunk/hub/fork_join_queue_test.py Tue Jul 26 09:40:18 2011
@@ -293,7 +293,10 @@
work_index = TEST_QUEUE.next_index()
tasks = []
for i in xrange(6):
- tasks.append(TestModel(work_index=work_index, number=i))
+ # Simplify tests by assigning the key names of the TestModel, making
it
+ # so the values returned by pop_request() below are predictable.
+ key = db.Key.from_path(TestModel.kind(), i+1)
+ tasks.append(TestModel(key=key, work_index=work_index, number=i))
db.put(tasks)
TEST_QUEUE.add(work_index, gettime=self.gettime1)
@@ -397,13 +400,15 @@
"""Tests adding and popping from a sharded queue with continuation."""
from google.appengine.api import apiproxy_stub_map
stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')
- old_valid = stub._IsValidQueue
- stub._IsValidQueue = lambda *a, **k: True
+ stub._queues[None]._all_queues_valid = True
try:
work_index = SHARDED_QUEUE.next_index()
tasks = []
for i in xrange(5):
- tasks.append(TestModel(work_index=work_index, number=i))
+ # Simplify tests by assigning the key names of the TestModel,
making it
+ # so the values returned by pop_request() below are predictable.
+ key = db.Key.from_path(TestModel.kind(), i+1)
+ tasks.append(TestModel(key=key, work_index=work_index, number=i))
db.put(tasks)
SHARDED_QUEUE.add(work_index, gettime=self.gettime1)
queue_name = 'default-%d' % (1 + (work_index % 4))
@@ -427,7 +432,7 @@
self.assertTrue('cursor' in next_task['params'])
self.assertTrue(next_task['name'].endswith('-1'))
finally:
- stub._IsValidQueue = old_valid
+ stub._queues[None]._all_queues_valid = False
def testMemcacheQueue(self):
"""Tests adding and popping from an in-memory queue with
continuation."""
=======================================
--- /trunk/hub/main.py Mon Nov 15 15:47:16 2010
+++ /trunk/hub/main.py Tue Jul 26 09:40:18 2011
@@ -35,6 +35,9 @@
conjunction with KnownFeed to properly canonicalize feed aliases on
subscription and pinging.
+* KnownFeedStats: Statistics about a topic URL. Used to provide subscriber
+ counts to publishers on feed fetch.
+
* FeedRecord: Metadata information about a feed, the last time it was
polled,
and any headers that may affect future polling. Also contains any
debugging
information about the last feed fetch and why it may have failed.
@@ -75,11 +78,6 @@
# Bigger TODOs (in priority order)
#
-# - Add subscription counting to PushEventHandler so we can deliver a
header
-# with the number of subscribers the feed has. This will simply just keep
-# count of the subscribers seen so far and then when the pushing is done
it
-# will save that total back on the FeedRecord instance.
-#
# - Improve polling algorithm to keep stats on each feed.
#
# - Do not poll a feed if we've gotten an event from the publisher in less
@@ -105,8 +103,8 @@
from google.appengine.api import memcache
from google.appengine.api import urlfetch
from google.appengine.api import urlfetch_errors
+from google.appengine.api import taskqueue
from google.appengine.api import users
-from google.appengine.api.labs import taskqueue
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
@@ -624,6 +622,9 @@
def sha1_hmac(secret, data):
"""Returns the sha1 hmac for a chunk of data and a secret."""
+ # For Python 2.6, which can only compute hmacs on non-unicode data.
+ secret = utf8encoded(secret)
+ data = utf8encoded(data)
return hmac.new(secret, data, hashlib.sha1).hexdigest()
@@ -1031,7 +1032,7 @@
"""
def txn():
if self.confirm_failures >= max_failures:
- logging.warning('Max subscription failures exceeded, giving up.')
+ logging.debug('Max subscription failures exceeded, giving up.')
return False
else:
retry_delay = retry_period * (2 ** self.confirm_failures)
@@ -1146,11 +1147,12 @@
"""
def txn():
if self.fetching_failures >= max_failures:
- logging.warning('Max fetching failures exceeded, giving up.')
+ logging.debug('Max fetching failures exceeded, giving up.')
self.totally_failed = True
else:
retry_delay = retry_period * (2 ** self.fetching_failures)
- logging.warning('Fetching failed. Will retry in %s seconds',
retry_delay)
+ logging.debug('Fetching failed. Will retry in %s seconds',
+ retry_delay)
self.eta = now() + datetime.timedelta(seconds=retry_delay)
self.fetching_failures += 1
self._enqueue_retry_task()
@@ -1317,9 +1319,12 @@
if header_footer is not None and self.format != ARBITRARY:
self.header_footer = header_footer
- def get_request_headers(self):
+ def get_request_headers(self, subscriber_count):
"""Returns the request headers that should be used to pull this feed.
+ Args:
+ subscriber_count: The number of subscribers this feed has.
+
Returns:
Dictionary of request header values.
"""
@@ -1331,6 +1336,10 @@
headers['If-Modified-Since'] = self.last_modified
if self.etag:
headers['If-None-Match'] = self.etag
+ if subscriber_count:
+ headers['User-Agent'] = (
+ 'Public Hub (+http://pubsubhubbub.appspot.com; %d subscribers)' %
+ subscriber_count)
return headers
@@ -1619,15 +1628,15 @@
self.totally_failed = True
if self.delivery_mode == EventToDeliver.NORMAL:
- logging.warning('Normal delivery done; %d broken callbacks remain',
- len(self.failed_callbacks))
+ logging.debug('Normal delivery done; %d broken callbacks remain',
+ len(self.failed_callbacks))
self.delivery_mode = EventToDeliver.RETRY
else:
- logging.warning('End of attempt %d; topic = %s, subscribers = %d, '
- 'waiting until %s or totally_failed = %s',
- self.retry_attempts, self.topic,
- len(self.failed_callbacks), self.last_modified,
- self.totally_failed)
+ logging.debug('End of attempt %d; topic = %s, subscribers = %d, '
+ 'waiting until %s or totally_failed = %s',
+ self.retry_attempts, self.topic,
+ len(self.failed_callbacks), self.last_modified,
+ self.totally_failed)
def txn():
self.put()
@@ -1738,6 +1747,57 @@
return result
+class KnownFeedStats(db.Model):
+ """Represents stats about a feed we know that exists.
+
+ Parent is the KnownFeed entity for a given topic URL.
+ """
+
+ subscriber_count = db.IntegerProperty()
+ update_time = db.DateTimeProperty(auto_now=True)
+
+ @classmethod
+ def create_key(cls, topic_url=None, topic_hash=None):
+ """Creates a key for a KnownFeedStats instance.
+
+ Args:
+ topic_url: The topic URL to create the key for.
+ topic_hash: The hash of the topic URL to create the key for. May only
+ be supplied if topic_url is None.
+
+ Returns:
+ db.Key of the KnownFeedStats instance.
+ """
+ if topic_url and topic_hash:
+ raise TypeError('Must specify topic_url or topic_hash.')
+ if topic_url:
+ topic_hash = sha1_hash(topic_url)
+
+ return db.Key.from_path(KnownFeed.kind(), topic_hash,
+ cls.kind(), 'overall')
+
+ @classmethod
+ def get_or_create_all(cls, topic_list):
+ """Retrieves and/or creates KnownFeedStats entities for the supplied
topics.
+
+ Args:
+ topic_list: List of topics to retrieve.
+
+ Returns:
+ The list of KnownFeedStats corresponding to the input topic list in
+ the same order they were supplied.
+ """
+ key_list = [cls.create_key(t) for t in topic_list]
+ found_list = db.get(key_list)
+ results = []
+ for topic, key, found in zip(topic_list, key_list, found_list):
+ if found:
+ results.append(found)
+ else:
+ results.append(cls(key=key, subscriber_count=0))
+ return results
+
+
class PollingMarker(db.Model):
"""Keeps track of the current position in the bootstrap polling
process."""
@@ -1966,9 +2026,9 @@
deadline=MAX_FETCH_SECONDS)
except urlfetch_errors.Error:
error_traceback = traceback.format_exc()
- logging.warning('Error encountered while confirming subscription '
- 'to %s for callback %s:\n%s',
- topic, callback, error_traceback)
+ logging.debug('Error encountered while confirming subscription '
+ 'to %s for callback %s:\n%s',
+ topic, callback, error_traceback)
return False
if 200 <= response.status_code < 300 and response.content == challenge:
@@ -1989,9 +2049,9 @@
'topic = %s; subscription archived', callback, topic)
return True
else:
- logging.warning('Could not confirm subscription; encountered '
- 'status %d with content: %s', response.status_code,
- response.content)
+ logging.debug('Could not confirm subscription; encountered '
+ 'status %d with content: %s', response.status_code,
+ response.content)
return False
@@ -2086,8 +2146,9 @@
return self.response.set_status(202)
except (apiproxy_errors.Error, db.Error,
- runtime.DeadlineExceededError, taskqueue.Error):
- logging.exception('Could not verify subscription request')
+ runtime.DeadlineExceededError, taskqueue.Error), e:
+ logging.debug('Could not verify subscription request. %s: %s',
+ e.__class__.__name__, e)
self.response.headers['Retry-After'] = '120'
return self.response.set_status(503)
@@ -2161,8 +2222,8 @@
self.start_map(
name='Reconfirm expiring subscriptions',
handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
- reader_spec='offline_jobs.HashKeyDatastoreInputReader',
- reader_parameters=dict(
+ reader_spec='mapreduce.input_readers.DatastoreInputReader',
+ mapper_parameters=dict(
processing_rate=100000,
entity_kind='main.Subscription',
threshold_timestamp=int(
@@ -2623,29 +2684,30 @@
if not ready_feed_list:
return
- feed_record_list = FeedRecord.get_or_create_all(
- [f.topic for f in ready_feed_list])
+ topic_list = [f.topic for f in ready_feed_list]
+ feed_record_list = FeedRecord.get_or_create_all(topic_list)
+ feed_stats_list = KnownFeedStats.get_or_create_all(topic_list)
start_time = time.time()
reporter = dos.Reporter()
successful_topics = []
failed_topics = []
- def create_callback(feed_record, work, fetch_url, attempts):
+ def create_callback(feed_record, feed_stats, work, fetch_url,
attempts):
return lambda *args: callback(
- feed_record, work, fetch_url, attempts, *args)
-
- def callback(feed_record, work, fetch_url, attempts,
+ feed_record, feed_stats, work, fetch_url, attempts, *args)
+
+ def callback(feed_record, feed_stats, work, fetch_url, attempts,
status_code, headers, content, exception):
should_parse = False
fetch_success = False
if exception:
if isinstance(exception, urlfetch.ResponseTooLargeError):
- logging.critical('Feed response too large for topic %r at
url %r; '
- 'skipping', work.topic, fetch_url)
+ logging.warning('Feed response too large for topic %r at
url %r; '
+ 'skipping', work.topic, fetch_url)
work.done()
elif isinstance(exception, urlfetch.InvalidURLError):
- logging.critical('Invalid redirection for topic %r to url %r; '
- 'skipping', work.topic, fetch_url)
+ logging.warning('Invalid redirection for topic %r to url %r; '
+ 'skipping', work.topic, fetch_url)
work.done()
elif isinstance(exception, (apiproxy_errors.Error,
urlfetch.Error)):
logging.warning('Failed to fetch topic %r at url %r. %s: %s',
@@ -2663,13 +2725,17 @@
logging.debug('Feed publisher for topic %r returned %d '
'redirect to %r', work.topic, status_code,
fetch_url)
if attempts >= MAX_REDIRECTS:
- logging.warning('Too many redirects!')
+ logging.warning('Too many redirects for topic %r', work.topic)
work.fetch_failed()
else:
# Recurse to do the refetch.
hooks.execute(pull_feed_async,
- work, fetch_url, feed_record.get_request_headers(),
async_proxy,
- create_callback(feed_record, work, fetch_url, attempts +
1))
+ work,
+ fetch_url,
+
feed_record.get_request_headers(feed_stats.subscriber_count),
+ async_proxy,
+ create_callback(feed_record, feed_stats, work, fetch_url,
+ attempts + 1))
return
elif status_code == 304:
logging.debug('Feed publisher for topic %r returned '
@@ -2677,9 +2743,9 @@
work.done()
fetch_success = True
else:
- logging.warning('Received bad response for topic = %r, '
- 'status_code = %s, response_headers = %r',
- work.topic, status_code, headers)
+ logging.debug('Received bad response for topic = %r, '
+ 'status_code = %s, response_headers = %r',
+ work.topic, status_code, headers)
work.fetch_failed()
# Fetch is done one way or another.
@@ -2700,10 +2766,14 @@
# End callback
# Fire off a fetch for every work item and wait for all callbacks.
- for work, feed_record in zip(ready_feed_list, feed_record_list):
+ for work, feed_record, feed_stats in zip(
+ ready_feed_list, feed_record_list, feed_stats_list):
hooks.execute(pull_feed_async,
- work, work.topic, feed_record.get_request_headers(), async_proxy,
- create_callback(feed_record, work, work.topic, 1))
+ work,
+ work.topic,
+ feed_record.get_request_headers(feed_stats.subscriber_count),
+ async_proxy,
+ create_callback(feed_record, feed_stats, work, work.topic, 1))
try:
async_proxy.wait()
@@ -2782,10 +2852,10 @@
end_time = time.time()
latency = int((end_time - start_time) * 1000)
if exception or not (200 <= result.status_code <= 299):
- logging.warning('Could not deliver to target url %s: '
- 'Exception = %r, status_code = %s',
- sub.callback, exception,
- getattr(result, 'status_code', 'unknown'))
+ logging.debug('Could not deliver to target url %s: '
+ 'Exception = %r, status_code = %s',
+ sub.callback, exception,
+ getattr(result, 'status_code', 'unknown'))
report_delivery(reporter, sub.callback, False, latency)
else:
failed_callbacks.remove(sub)
@@ -2984,8 +3054,9 @@
try:
response = urlfetch.fetch(topic)
- except (apiproxy_errors.Error, urlfetch.Error):
- logging.exception('Could not fetch topic = %s for feed ID', topic)
+ except (apiproxy_errors.Error, urlfetch.Error), e:
+ logging.warning('Could not fetch topic = %s for feed ID. %s: %s',
+ topic, e.__class__.__name__, e)
known_feed.put()
return
@@ -3098,6 +3169,14 @@
FETCH_URL_SAMPLE_DAY_LATENCY,
single_key=topic_url),
}
+
+ if users.is_current_user_admin():
+ feed_stats = db.get(KnownFeedStats.create_key(topic_url=topic_url))
+ if feed_stats:
+ context.update({
+ 'subscriber_count': feed_stats.subscriber_count,
+ 'feed_stats_update_time': feed_stats.update_time,
+ })
fetch = FeedToFetch.get_by_topic(topic_url)
if fetch:
=======================================
--- /trunk/hub/main_test.py Mon Nov 15 15:47:16 2010
+++ /trunk/hub/main_test.py Tue Jul 26 09:40:18 2011
@@ -1504,7 +1504,7 @@
################################################################################
FeedRecord = main.FeedRecord
-
+KnownFeedStats = main.KnownFeedStats
class PullFeedHandlerTest(testutil.HandlerTestBase):
@@ -1725,6 +1725,39 @@
self.assertTrue(EventToDeliver.all().get() is None)
testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
+ self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
+
+ def testStatsUserAgent(self):
+ """Tests that the user agent string includes feed stats."""
+ info = FeedRecord.get_or_create(self.topic)
+ info.update(self.headers)
+ info.put()
+
+ KnownFeedStats(
+ key=KnownFeedStats.create_key(self.topic),
+ subscriber_count=123).put()
+
+ request_headers = {
+ 'User-Agent':
+ 'Public Hub (+http://pubsubhubbub.appspot.com; 123 subscribers)',
+ }
+
+ FeedToFetch.insert([self.topic])
+ self.entry_list = []
+ urlfetch_test_stub.instance.expect(
+ 'get', self.topic, 200, self.expected_response,
+ request_headers=request_headers,
+ response_headers=self.headers)
+ self.run_fetch_task()
+ self.assertTrue(EventToDeliver.all().get() is None)
+ testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
+
+ record = FeedRecord.get_or_create(self.topic)
+ self.assertEquals(self.header_footer, record.header_footer)
+ self.assertEquals(self.etag, record.etag)
+ self.assertEquals(self.last_modified, record.last_modified)
+ self.assertEquals('application/atom+xml', record.content_type)
+
self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
def testNoNewEntries(self):
@@ -2193,7 +2226,7 @@
self.assertEquals(
{'Connection': 'cache-control',
'Cache-Control': 'no-cache no-store max-age=1'},
- FeedRecord.all().get().get_request_headers())
+ FeedRecord.all().get().get_request_headers(0))
def testPullGoodRss(self):
"""Tests when the RSS XML can parse just fine."""
@@ -3719,13 +3752,13 @@
self.now = time.time()
self.called = False
def start_map(*args, **kwargs):
- self.assertEquals(kwargs, {
+ self.assertEquals({
'name': 'Reconfirm expiring subscriptions',
- 'reader_spec': 'offline_jobs.HashKeyDatastoreInputReader',
+ 'reader_spec': 'mapreduce.input_readers.DatastoreInputReader',
'queue_name': 'polling',
'handler_spec': 'offline_jobs.SubscriptionReconfirmMapper.run',
'shard_count': 4,
- 'reader_parameters': {
+ 'mapper_parameters': {
'entity_kind': 'main.Subscription',
'processing_rate': 100000,
'threshold_timestamp':
@@ -3735,7 +3768,7 @@
'done_callback': '/work/cleanup_mapper',
'done_callback_queue': 'polling',
},
- })
+ }, kwargs)
self.called = True
def create_handler():
@@ -3801,7 +3834,7 @@
name='Reconfirm expiring subscriptions',
handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
reader_spec='mapreduce.input_readers.DatastoreInputReader',
- reader_parameters=dict(
+ mapper_parameters=dict(
processing_rate=100000,
entity_kind='main.Subscription'))
=======================================
--- /trunk/hub/mapreduce.yaml Tue Nov 16 13:41:28 2010
+++ /trunk/hub/mapreduce.yaml Tue Jul 26 09:40:18 2011
@@ -1,15 +1,4 @@
mapreduce:
-- name: Remove old properties from FeedEntryRecords
- mapper:
- input_reader: mapreduce.input_readers.DatastoreInputReader
- handler: offline_jobs.RemoveOldFeedEntryRecordPropertiesMapper
- params:
- - name: entity_kind
- default: main.FeedEntryRecord
- - name: shard_count
- default: 32
- - name: processing_rate
- default: 100000
- name: Cleanup old EventToDeliver instances
mapper:
input_reader: mapreduce.input_readers.DatastoreInputReader
@@ -26,7 +15,7 @@
params_validator: offline_jobs.CleanupOldEventToDeliver.validate_params
- name: Reconfirm expiring subscriptions
mapper:
- input_reader: offline_jobs.HashKeyDatastoreInputReader
+ input_reader: mapreduce.input_readers.DatastoreInputReader
handler: offline_jobs.SubscriptionReconfirmMapper.run
params:
- name: entity_kind
@@ -39,7 +28,7 @@
params_validator:
offline_jobs.SubscriptionReconfirmMapper.validate_params
- name: Count subscribers by topic and callback pattern
mapper:
- input_reader: offline_jobs.HashKeyDatastoreInputReader
+ input_reader: mapreduce.input_readers.DatastoreInputReader
handler: offline_jobs.CountSubscribers.run
params:
- name: entity_kind
=======================================
--- /trunk/hub/offline_jobs.py Tue Nov 16 12:40:57 2010
+++ /trunk/hub/offline_jobs.py Tue Jul 26 09:40:18 2011
@@ -29,20 +29,9 @@
from mapreduce import context
from mapreduce import input_readers
+from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import util
-from mapreduce.lib import key_range
-
-
-def RemoveOldFeedEntryRecordPropertiesMapper(feed_entry_record):
- """Removes old properties from FeedEntryRecord instances."""
- OLD_PROPERTIES = (
- 'entry_id_hash',
- 'entry_id')
- for name in OLD_PROPERTIES:
- if hasattr(feed_entry_record, name):
- delattr(feed_entry_record, name)
- yield op.db.Put(feed_entry_record)
class CleanupOldEventToDeliver(object):
@@ -68,7 +57,7 @@
class CountSubscribers(object):
- """Mapper counts subscribers to a feed pattern by domain.
+ """Mapper counts active subscribers to a feed pattern by domain.
Args:
topic_pattern: Fully-matching regular expression pattern for topics to
@@ -96,60 +85,11 @@
if self.topic_pattern.match(subscription.topic):
the_match = self.callback_pattern.match(subscription.callback)
- if the_match:
+ if (the_match and
+ subscription.subscription_state ==
main.Subscription.STATE_VERIFIED):
yield op.counters.Increment(the_match.group(1))
-
-
-class HashKeyDatastoreInputReader(input_readers.DatastoreInputReader):
- """A DatastoreInputReader that can split evenly across hash key ranges.
-
- Assumes key names are in the format supplied by the
main.get_hash_key_name
- function.
- """
-
- @classmethod
- def _split_input_from_namespace(
- cls, app, namespace, entity_kind_name, shard_count):
- entity_kind = util.for_name(entity_kind_name)
- entity_kind_name = entity_kind.kind()
-
- hex_key_start = db.Key.from_path(
- entity_kind_name, 0)
- hex_key_end = db.Key.from_path(
- entity_kind_name, int('f' * 40, base=16))
- hex_range = key_range.KeyRange(
- hex_key_start, hex_key_end, None, True, True,
- namespace=namespace,
- _app=app)
-
- key_range_list = [hex_range]
- number_of_half_splits = int(math.floor(math.log(shard_count, 2)))
- for index in xrange(0, number_of_half_splits):
- new_ranges = []
- for current_range in key_range_list:
- new_ranges.extend(current_range.split_range(1))
- key_range_list = new_ranges
-
- adjusted_range_list = []
- for current_range in key_range_list:
- adjusted_range = key_range.KeyRange(
- key_start=db.Key.from_path(
- current_range.key_start.kind(),
- 'hash_%040x' % (current_range.key_start.id() or 0),
- _app=current_range._app),
- key_end=db.Key.from_path(
- current_range.key_end.kind(),
- 'hash_%040x' % (current_range.key_end.id() or 0),
- _app=current_range._app),
- direction=current_range.direction,
- include_start=current_range.include_start,
- include_end=current_range.include_end,
- namespace=current_range.namespace,
- _app=current_range._app)
-
- adjusted_range_list.append(adjusted_range)
-
- return adjusted_range_list
+ elif the_match:
+ yield op.counters.Increment('matched but inactive')
class SubscriptionReconfirmMapper(object):
@@ -174,3 +114,34 @@
if sub.expiration_time < self.threshold_timestamp:
sub.request_insert(sub.callback, sub.topic, sub.verify_token,
sub.secret, auto_reconfirm=True)
+
+
+def count_subscriptions_for_topic(subscription):
+ """Counts a Subscription instance if it's still active."""
+ print subscription.subscription_state
+ if subscription.subscription_state == main.Subscription.STATE_VERIFIED:
+ yield (subscription.topic_hash, '1')
+
+
+def save_subscription_counts_for_topic(topic_hash, counts):
+ """Sums subscriptions to a topic and saves a corresponding
KnownFeedStat."""
+ total_count = len(counts)
+ entity = main.KnownFeedStats(
+ key=main.KnownFeedStats.create_key(topic_hash=topic_hash),
+ subscriber_count=total_count)
+ yield op.db.Put(entity)
+
+
+def start_count_subscriptions():
+ """Kicks off the MapReduce for determining and saving subscription
counts."""
+ job = mapreduce_pipeline.MapreducePipeline(
+ 'Count subscriptions',
+ 'offline_jobs.count_subscriptions_for_topic',
+ 'offline_jobs.save_subscription_counts_for_topic',
+ 'mapreduce.input_readers.DatastoreInputReader',
+ mapper_params=dict(entity_kind='main.Subscription'),
+ shards=4)
+ # TODO(bslatkin): Pass through the queue name to run the job on. This is
+ # a limitation in the mapper library.
+ job.start()
+ return job.pipeline_id
=======================================
--- /trunk/hub/offline_jobs_test.py Tue Nov 16 12:40:57 2010
+++ /trunk/hub/offline_jobs_test.py Tue Jul 26 09:40:18 2011
@@ -37,145 +37,6 @@
################################################################################
-class HashKeyDatastoreInputReaderTest(unittest.TestCase):
- """Tests for the HashKeyDatastoreInputReader."""
-
- def setUp(self):
- """Sets up the test harness."""
- testutil.setup_for_testing()
- self.app = 'my-app-id'
- self.entity_kind = 'main.Subscription'
- self.namespace = 'my-namespace'
-
- def testOneShard(self):
- """Tests just one shard."""
- result = (
-
offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
- self.app, self.namespace, self.entity_kind, 1))
-
- expected = [
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_0000000000000000000000000000000000000000',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_ffffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='ASC',
- include_start=True,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id')
- ]
- self.assertEquals(expected, result)
-
- def testTwoShards(self):
- """Tests two shares: one for number prefixes, one for letter
prefixes."""
- result = (
-
offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
- self.app, self.namespace, self.entity_kind, 2))
-
- expected = [
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_0000000000000000000000000000000000000000',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_7fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='DESC',
- include_start=True,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_7fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_ffffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='ASC',
- include_start=False,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- ]
- self.assertEquals(expected, result)
-
- def testManyShards(self):
- """Tests having many shards with multiple levels of splits."""
- result = (
-
offline_jobs.HashKeyDatastoreInputReader._split_input_from_namespace(
- self.app, self.namespace, self.entity_kind, 4))
-
- expected = [
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_0000000000000000000000000000000000000000',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_3fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='DESC',
- include_start=True,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_3fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_7fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='ASC',
- include_start=False,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_7fffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_bfffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='DESC',
- include_start=False,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- key_range.KeyRange(
- key_start=db.Key.from_path(
- 'Subscription',
- u'hash_bfffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- key_end=db.Key.from_path(
- 'Subscription',
- u'hash_ffffffffffffffffffffffffffffffffffffffff',
- _app=u'my-app-id'),
- direction='ASC',
- include_start=False,
- include_end=True,
- namespace='my-namespace',
- _app='my-app-id'),
- ]
- self.assertEquals(expected, result)
-
-
Subscription = main.Subscription
@@ -341,6 +202,17 @@
self.assertEquals(1, counter.delta)
self.assertRaises(StopIteration, gen.next)
+ def testTopicMatch_CallbackMatch_Inactive(self):
+ """Tests when the subscription matches but is inactive."""
+ sub = self.get_subscription()
+ sub.subscription_state = Subscription.STATE_NOT_VERIFIED
+ sub.put()
+ gen = self.mapper.run(sub)
+ counter = gen.next()
+ self.assertEquals('matched but inactive', counter.counter_name)
+ self.assertEquals(1, counter.delta)
+ self.assertRaises(StopIteration, gen.next)
+
def testTopicMatch_CallbackNoMatch(self):
"""Tests when the topic matches but the callback does not."""
self.callback = 'some garbage'
@@ -355,6 +227,60 @@
gen = self.mapper.run(sub)
self.assertRaises(StopIteration, gen.next)
+################################################################################
+
+class SaveSubscriptionCountsTest(unittest.TestCase):
+ """Tests for the MapReduce that saves subscription counts."""
+
+ def setUp(self):
+ """Sets up the test harness."""
+ testutil.setup_for_testing()
+ self.callback = 'http://foo.callback-example.com/my-callback-url'
+ self.topic = 'http://example.com/my-topic-url'
+ self.token = 'token'
+ self.secret = 'my secrat'
+
+ def testMap(self):
+ """Tests the mapper function."""
+ self.assertTrue(Subscription.insert(
+ self.callback, self.topic, self.token, self.secret))
+ sub = Subscription.get_by_key_name(
+ Subscription.create_key_name(self.callback, self.topic))
+
+ # Active subscription.
+ it = offline_jobs.count_subscriptions_for_topic(sub)
+ self.assertEquals(('95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2', '1'),
+ it.next())
+ self.assertRaises(StopIteration, it.next)
+
+ # Not active
+ Subscription.archive(self.callback, self.topic)
+ sub = db.get(sub.key())
+ it = offline_jobs.count_subscriptions_for_topic(sub)
+ self.assertRaises(StopIteration, it.next)
+
+ def testReduce(self):
+ """Tests the reducer function."""
+ self.assertEquals(0, len(list(main.KnownFeedStats.all())))
+ it = offline_jobs.save_subscription_counts_for_topic(
+ '95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2',
+ ['1'] * 321)
+ op = it.next()
+ self.assertEquals(
+ db.Key.from_path(
+ 'KnownFeed', '95ff66c343530c88a750cbc7fd1e0bbd8cc7bce2',
+ 'KnownFeedStats', 'overall'),
+ op.entity.key())
+ self.assertEquals(321, op.entity.subscriber_count)
+ self.assertRaises(StopIteration, it.next)
+
+ def testStart(self):
+ """Tests starting the mapreduce job."""
+ job_id = offline_jobs.start_count_subscriptions()
+ self.assertTrue(job_id is not None)
+ task = testutil.get_tasks('default', expected_count=1, index=0)
+ self.assertEquals('/mapreduce/pipeline/run', task['url'])
+
################################################################################
if __name__ == '__main__':
=======================================
--- /trunk/hub/topic_details.html Mon Mar 15 23:00:57 2010
+++ /trunk/hub/topic_details.html Tue Jul 26 09:40:18 2011
@@ -42,6 +42,16 @@
<td>Last Modified:</td>
<td>{{last_modified|escape}}</td>
</tr>
+ {% if subscriber_count %}
+ <tr>
+ <td>Active subscribers:</td>
+ <td>{{subscriber_count}}</td>
+ </tr>
+ <tr>
+ <td>Last count (UTC):</td>
+ <td>{{feed_stats_update_time|date:"Y-m-d\TH:i:s\Z"}}</td>
+ </tr>
+ {% endif %}
<tr>
<td>Fetch from domain:</td>
<td>