Revision: 387
Author: bslatkin
Date: Wed Sep 22 15:16:49 2010
Log: use mapper framework for subscription reconfirmation
http://code.google.com/p/pubsubhubbub/source/detail?r=387
Modified:
/trunk/hub
/trunk/hub/async_apiproxy.py
/trunk/hub/index.yaml
/trunk/hub/main.py
/trunk/hub/main_test.py
/trunk/hub/mapreduce.yaml
/trunk/hub/offline_jobs.py
=======================================
--- /trunk/hub/async_apiproxy.py Sun Jul 11 22:58:45 2010
+++ /trunk/hub/async_apiproxy.py Wed Sep 22 15:16:49 2010
@@ -126,10 +126,8 @@
def wait(self):
"""Wait for RPCs to finish. Returns True if any were processed."""
- while self.enqueued:
+ while self.enqueued or self.complete:
# Run the callbacks before even waiting, because a response could
have
# come back during any outbound API call.
self._run_callbacks()
self._wait_one()
- # Run them one last time after waiting to pick up the final callback!
- self._run_callbacks()
=======================================
--- /trunk/hub/index.yaml Wed Feb 3 09:37:28 2010
+++ /trunk/hub/index.yaml Wed Sep 22 15:16:49 2010
@@ -24,3 +24,8 @@
# manually, move them above the marker line. The index.yaml file is
# automatically uploaded to the admin console when you next deploy
# your application using appcfg.py.
+
+- kind: Subscription
+ properties:
+ - name: __key__
+ direction: desc
=======================================
--- /trunk/hub/main.py Sun Jul 11 22:57:08 2010
+++ /trunk/hub/main.py Wed Sep 22 15:16:49 2010
@@ -119,6 +119,9 @@
import fork_join_queue
import urlfetch_async
+import mapreduce.control
+import mapreduce.model
+
async_proxy = async_apiproxy.AsyncAPIProxy()
################################################################################
@@ -168,17 +171,17 @@
# How far before expiration to refresh subscriptions.
SUBSCRIPTION_CHECK_BUFFER_SECONDS = (24 * 60 * 60) # 24 hours
-# How many subscriber checking tasks to scheudle at a time.
-SUBSCRIPTION_CHECK_CHUNK_SIZE = 200
+# How many mapper shards to use for reconfirming subscriptions.
+SUBSCRIPTION_RECONFIRM_SHARD_COUNT = 4
# How often to poll feeds.
POLLING_BOOTSTRAP_PERIOD = 10800 # in seconds; 3 hours
# Default expiration time of a lease.
-DEFAULT_LEASE_SECONDS = (30 * 24 * 60 * 60) # 30 days
+DEFAULT_LEASE_SECONDS = (5 * 24 * 60 * 60) # 5 days
# Maximum expiration time of a lease.
-MAX_LEASE_SECONDS = DEFAULT_LEASE_SECONDS * 3 # 90 days
+MAX_LEASE_SECONDS = (10 * 24 * 60 * 60) # 10 days
# Maximum number of redirects to follow when feed fetching.
MAX_REDIRECTS = 7
@@ -2111,62 +2114,41 @@
class SubscriptionReconfirmHandler(webapp.RequestHandler):
"""Periodic handler causes reconfirmation for almost expired
subscriptions."""
- def __init__(self, now=time.time):
+ def __init__(self, now=time.time, start_map=mapreduce.control.start_map):
"""Initializer."""
webapp.RequestHandler.__init__(self)
self.now = now
+ self.start_map = start_map
@work_queue_only
def get(self):
- threshold_timestamp = str(
- int(self.now() - SUBSCRIPTION_CHECK_BUFFER_SECONDS))
- # NOTE: See PollBootstrapHandler as to why we need a named task here
for
- # the first insertion and the rest of the sequence.
- name = 'reconfirm-' + threshold_timestamp
+ # Use the name, such that only one of these tasks runs per calendar
day.
+ name = 'reconfirm-%s' % time.strftime('%Y-%m-%d' ,
time.gmtime(self.now()))
try:
taskqueue.Task(
url='/work/reconfirm_subscriptions',
- name=name,
- params=dict(time_offset=threshold_timestamp)
+ name=name
).add(POLLING_QUEUE)
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
- logging.exception('Could not enqueue FIRST reconfirmation task')
+ logging.exception('Could not enqueue FIRST reconfirmation task; '
+ 'must have already run today.')
@work_queue_only
def post(self):
- time_offset = self.request.get('time_offset')
- datetime_offset = datetime.datetime.utcfromtimestamp(int(time_offset))
- key_offset = self.request.get('key_offset')
- logging.info('Handling reconfirmations for time_offset = %s, '
- 'current_key = %s', datetime_offset, key_offset)
-
- query = (Subscription.all()
- .filter('subscription_state =', Subscription.STATE_VERIFIED)
- .order('__key__'))
- if key_offset:
- query.filter('__key__ >', db.Key(key_offset))
-
- subscriptions = query.fetch(SUBSCRIPTION_CHECK_CHUNK_SIZE)
- if not subscriptions:
- logging.info('All done with periodic subscription reconfirmations')
- return
-
- next_key = str(subscriptions[-1].key())
- try:
- taskqueue.Task(
- url='/work/reconfirm_subscriptions',
- name='reconfirm-%s-%s' % (time_offset, sha1_hash(next_key)),
- params=dict(time_offset=time_offset,
- key_offset=next_key)).add(POLLING_QUEUE)
- except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
- logging.exception('Continued polling task already present; '
- 'this work has already been done')
- return
-
- for sub in subscriptions:
- if sub.expiration_time < datetime_offset:
- sub.request_insert(sub.callback, sub.topic, sub.verify_token,
- sub.secret, auto_reconfirm=True)
+ self.start_map(
+ name='Reconfirm expiring subscriptions',
+ handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
+ reader_spec='mapreduce.input_readers.DatastoreInputReader',
+ reader_parameters=dict(
+ processing_rate=100000,
+ entity_kind='main.Subscription'),
+ shard_count=SUBSCRIPTION_RECONFIRM_SHARD_COUNT,
+ queue_name=POLLING_QUEUE,
+ mapreduce_parameters=dict(
+ threshold_timestamp=int(
+ self.now() + SUBSCRIPTION_CHECK_BUFFER_SECONDS),
+ done_callback='/work/cleanup_mapper',
+ done_callback_queue=POLLING_QUEUE))
class SubscriptionCleanupHandler(webapp.RequestHandler):
@@ -2184,6 +2166,19 @@
except (db.Error, apiproxy_errors.Error,
runtime.DeadlineExceededError):
logging.exception('Could not clean-up Subscription instances')
+
+class CleanupMapperHandler(webapp.RequestHandler):
+ """Cleans up all data from a Mapper job run."""
+
+ @work_queue_only
+ def post(self):
+ mapreduce_id = self.request.headers.get('mapreduce-id')
+ # TODO: Use Mapper Cleanup API once available.
+
db.delete(mapreduce.model.MapreduceControl.get_key_by_job_id(mapreduce_id))
+ shards = mapreduce.model.ShardState.find_by_mapreduce_id(mapreduce_id)
+ db.delete(shards)
+
db.delete(mapreduce.model.MapreduceState.get_key_by_job_id(mapreduce_id))
+
################################################################################
# Publishing handlers
@@ -2411,7 +2406,6 @@
getattr(response, 'headers', None),
getattr(response, 'content', None),
exception)
-
urlfetch_async.fetch(fetch_url,
headers=headers,
follow_redirects=False,
@@ -3445,7 +3439,8 @@
(r'/work/poll_bootstrap', PollBootstrapHandler),
(r'/work/event_cleanup', EventCleanupHandler),
(r'/work/subscription_cleanup', SubscriptionCleanupHandler),
- (r'/work/reconfirm_subscriptions', SubscriptionReconfirmHandler)
+ (r'/work/reconfirm_subscriptions', SubscriptionReconfirmHandler),
+ (r'/work/cleanup_mapper', CleanupMapperHandler),
])
application = webapp.WSGIApplication(HANDLERS, debug=DEBUG)
wsgiref.handlers.CGIHandler().run(application)
=======================================
--- /trunk/hub/main_test.py Sun Jul 11 22:24:18 2010
+++ /trunk/hub/main_test.py Wed Sep 22 15:16:49 2010
@@ -45,6 +45,9 @@
import main
import urlfetch_test_stub
+import mapreduce.control
+import mapreduce.model
+
################################################################################
# For convenience
@@ -2719,7 +2722,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
def tearDown(self):
"""Tears down the test harness."""
@@ -2986,7 +2989,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'&hub.mode=%s'
- '&hub.lease_seconds=7776000')
+ '&hub.lease_seconds=864000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3014,7 +3017,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3147,7 +3150,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic%%2FCaSeSeNsItIvE'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3180,7 +3183,7 @@
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'%%2F%%7Eone%%3Atwo%%2F%%26%%3D'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3220,7 +3223,7 @@
'blah%%2F%%25E3%%2583%%2596%%25E3%%2583%%25AD'
'%%25E3%%2582%%25B0%%25E8%%25A1%%2586'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3267,7 +3270,7 @@
'blah%%2F%%25E3%%2583%%2596%%25E3%%2583%%25AD'
'%%25E3%%2582%%25B0%%25E8%%25A1%%2586'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
self.challenge)
@@ -3314,7 +3317,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
def tearDown(self):
"""Verify that all URL fetches occurred."""
@@ -3413,7 +3416,7 @@
'&hub.challenge=this_is_my_fake_challenge_string'
'&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
'&hub.mode=%s'
- '&hub.lease_seconds=2592000')
+ '&hub.lease_seconds=432000')
urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe',
200,
@@ -3620,95 +3623,45 @@
class SubscriptionReconfirmHandlerTest(testutil.HandlerTestBase):
"""Tests for the periodic subscription reconfirming worker."""
- def setUp(self):
- """Sets up the test harness."""
+ def testFullFlow(self):
+ """Tests a full flow through the reconfirm worker."""
self.now = time.time()
- self.now_datetime = datetime.datetime.utcfromtimestamp(self.now)
- self.confirm_time = self.now - main.SUBSCRIPTION_CHECK_BUFFER_SECONDS
+ self.called = False
+ def start_map(*args, **kwargs):
+ self.assertEquals(kwargs, {
+ 'name': 'Reconfirm expiring subscriptions',
+ 'reader_spec': 'mapreduce.input_readers.DatastoreInputReader',
+ 'queue_name': 'polling',
+ 'handler_spec': 'offline_jobs.SubscriptionReconfirmMapper.run',
+ 'shard_count': 4,
+ 'reader_parameters': {
+ 'entity_kind': 'main.Subscription',
+ 'processing_rate': 100000
+ },
+ 'mapreduce_parameters': {
+ 'done_callback': '/work/cleanup_mapper',
+ 'done_callback_queue': 'polling',
+ 'threshold_timestamp':
+ int(self.now + main.SUBSCRIPTION_CHECK_BUFFER_SECONDS)
+ },
+ })
+ self.called = True
+
def create_handler():
- return main.SubscriptionReconfirmHandler(now=lambda: self.now)
+ return main.SubscriptionReconfirmHandler(
+ now=lambda: self.now,
+ start_map=start_map)
self.handler_class = create_handler
- testutil.HandlerTestBase.setUp(self)
- self.original_chunk_size = main.SUBSCRIPTION_CHECK_CHUNK_SIZE
- main.SUBSCRIPTION_CHECK_CHUNK_SIZE = 2
+
os.environ['HTTP_X_APPENGINE_QUEUENAME'] = main.POLLING_QUEUE
-
- def tearDown(self):
- """Tears down the test harness."""
- testutil.HandlerTestBase.tearDown(self)
- main.SUBSCRIPTION_CHECK_CHUNK_SIZE = self.original_chunk_size
- del os.environ['HTTP_X_APPENGINE_QUEUENAME']
-
- def testFullFlow(self):
- """Tests a full flow through multiple chunks of the reconfirm
worker."""
- topic = 'http://example.com/topic'
- # Funny endings to maintain alphabetical order with hashes of callback
- # URL and topic URL.
- callback = 'http://example.com/callback1-ad'
- callback2 = 'http://example.com/callback2-b'
- callback3 = 'http://example.com/callback3-d'
- callback4 = 'http://example.com/callback4-a'
- token = 'my token'
- secret = 'my secret'
- lease_seconds = -main.SUBSCRIPTION_CHECK_BUFFER_SECONDS - 1
- now = lambda: self.now_datetime
-
- self.handle('get')
- task = testutil.get_tasks(main.POLLING_QUEUE, index=0,
expected_count=1)
- time_offset = task['params']['time_offset']
-
- # There will be four Subscriptions instances, three of which will
actually
- # be affected by this check.
- Subscription.insert(callback, topic, token, secret,
- lease_seconds=lease_seconds, now=now)
- Subscription.insert(callback2, topic, token, secret,
- lease_seconds=lease_seconds, now=now)
- Subscription.insert(callback3, topic, token, secret,
-
lease_seconds=2*main.SUBSCRIPTION_CHECK_BUFFER_SECONDS,
- now=now)
- Subscription.insert(callback4, topic, token, secret,
- lease_seconds=lease_seconds, now=now)
-
- all_subs = list(Subscription.all())
- confirm_tasks = []
-
- # Now run the post handler with the params from the first task. This
will
- # enqueue another task that takes on the second chunk of work and also
- # will enqueue tasks to confirm subscriptions.
- self.handle('post', *task['params'].items())
- confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=2))
- confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=3))
-
- # Run another post handler, which will pick up the remaining
subscription
- # confirmation and finish the work effort. Properly handle a race
- # condition where Subscription tasks may be inserted with an ETA before
- # the continuation task.
- all_tasks = testutil.get_tasks(main.POLLING_QUEUE, expected_count=4)
- task = [a for a in all_tasks[1:] if 'time_offset' in a['params']][0]
-
- # Run this task twice; the second time should do nothing.
- self.handle('post', *task['params'].items())
- self.handle('post', *task['params'].items())
-
- # Last task will find no more work to do.
- task = testutil.get_tasks(main.POLLING_QUEUE, index=4,
expected_count=6)
- if 'time_offset' not in task['params']:
- logging.info("FAIL!")
- task = testutil.get_tasks(main.POLLING_QUEUE, index=5)
- confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=4))
- else:
- confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=5))
-
- self.handle('post', *task['params'].items())
- testutil.get_tasks(main.POLLING_QUEUE, expected_count=6)
-
- # Verify all confirmation tasks.
- self.assertEquals(callback3, all_subs[2].callback)
- del all_subs[2]
- confirm_key_names = [s.key().name() for s in all_subs]
- found_key_names = [
- t['params']['subscription_key_name'] for t in confirm_tasks]
- self.assertEquals(confirm_key_names, found_key_names)
+ try:
+ self.handle('get')
+ task = testutil.get_tasks(main.POLLING_QUEUE, index=0,
expected_count=1)
+ self.handle('post')
+ finally:
+ del os.environ['HTTP_X_APPENGINE_QUEUENAME']
+
+ self.assertTrue(self.called)
class SubscriptionCleanupHandlerTest(testutil.HandlerTestBase):
@@ -3735,6 +3688,40 @@
self.assertEquals(2 * [Subscription.STATE_VERIFIED],
[s.subscription_state for s in Subscription.all()])
+
+class CleanupMapperHandlerTest(testutil.HandlerTestBase):
+ """Tests for the CleanupMapperHandler."""
+
+ handler_class = main.CleanupMapperHandler
+
+ def testMissing(self):
+ """Tests cleaning up a mapreduce that's not present."""
+ self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+ os.environ['HTTP_MAPREDUCE_ID'] = '12345'
+ try:
+ self.handle('post')
+ finally:
+ del os.environ['HTTP_MAPREDUCE_ID']
+ self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+
+ def testPresent(self):
+ """Tests cleaning up a mapreduce that's present."""
+ mapreduce_id = mapreduce.control.start_map(
+ name='Reconfirm expiring subscriptions',
+ handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
+ reader_spec='mapreduce.input_readers.DatastoreInputReader',
+ reader_parameters=dict(
+ processing_rate=100000,
+ entity_kind='main.Subscription'))
+
+ self.assertEquals(1, len(list(mapreduce.model.MapreduceState.all())))
+ os.environ['HTTP_MAPREDUCE_ID'] = mapreduce_id
+ try:
+ self.handle('post')
+ finally:
+ del os.environ['HTTP_MAPREDUCE_ID']
+ self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+
################################################################################
PollingMarker = main.PollingMarker
=======================================
--- /trunk/hub/mapreduce.yaml Sat Jun 5 18:18:50 2010
+++ /trunk/hub/mapreduce.yaml Wed Sep 22 15:16:49 2010
@@ -24,3 +24,15 @@
- name: age_days
default: 14
params_validator: offline_jobs.CleanupOldEventToDeliver.validate_params
+- name: Reconfirm expiring subscriptions
+ mapper:
+ input_reader: mapreduce.input_readers.DatastoreInputReader
+ handler: offline_jobs.SubscriptionReconfirmMapper.run
+ params:
+ - name: entity_kind
+ default: main.Subscription
+ - name: shard_count
+ default: 32
+ - name: processing_rate
+ default: 100000
+ params_validator:
offline_jobs.SubscriptionReconfirmMapper.validate_params
=======================================
--- /trunk/hub/offline_jobs.py Sat Jun 5 18:18:50 2010
+++ /trunk/hub/offline_jobs.py Wed Sep 22 15:16:49 2010
@@ -15,7 +15,7 @@
# limitations under the License.
#
-"""Offline analysis jobs used with the hub."""
+"""Offline cleanup and analysis jobs used with the hub."""
import datetime
import logging
@@ -23,6 +23,8 @@
from google.appengine.ext import db
+import main
+
from mapreduce import context
from mapreduce import operation as op
@@ -58,3 +60,27 @@
if event.last_modified < self.oldest_last_modified:
yield op.db.Delete(event)
+
+
+class SubscriptionReconfirmMapper(object):
+ """For reconfirming subscriptions that are nearing expiration."""
+
+ @staticmethod
+ def validate_params(params):
+ assert 'threshold_timestamp' in params
+
+ def __init__(self):
+ self.threshold_timestamp = None
+
+ def run(self, sub):
+ if sub.subscription_state != main.Subscription.STATE_VERIFIED:
+ return
+
+ if self.threshold_timestamp is None:
+ params = context.get().mapreduce_spec.params
+ self.threshold_timestamp = datetime.datetime.utcfromtimestamp(
+ params['threshold_timestamp'])
+
+ if sub.expiration_time < self.threshold_timestamp:
+ sub.request_insert(sub.callback, sub.topic, sub.verify_token,
+ sub.secret, auto_reconfirm=True)