Revision: 387
Author: bslatkin
Date: Wed Sep 22 15:16:49 2010
Log: use mapper framework for subscription reconfirmation
http://code.google.com/p/pubsubhubbub/source/detail?r=387

Modified:
 /trunk/hub
 /trunk/hub/async_apiproxy.py
 /trunk/hub/index.yaml
 /trunk/hub/main.py
 /trunk/hub/main_test.py
 /trunk/hub/mapreduce.yaml
 /trunk/hub/offline_jobs.py

=======================================
--- /trunk/hub/async_apiproxy.py        Sun Jul 11 22:58:45 2010
+++ /trunk/hub/async_apiproxy.py        Wed Sep 22 15:16:49 2010
@@ -126,10 +126,8 @@

   def wait(self):
     """Wait for RPCs to finish. Returns True if any were processed."""
-    while self.enqueued:
+    while self.enqueued or self.complete:
# Run the callbacks before even waiting, because a response could have
       # come back during any outbound API call.
       self._run_callbacks()
       self._wait_one()
-    # Run them one last time after waiting to pick up the final callback!
-    self._run_callbacks()
=======================================
--- /trunk/hub/index.yaml       Wed Feb  3 09:37:28 2010
+++ /trunk/hub/index.yaml       Wed Sep 22 15:16:49 2010
@@ -24,3 +24,8 @@
 # manually, move them above the marker line.  The index.yaml file is
 # automatically uploaded to the admin console when you next deploy
 # your application using appcfg.py.
+
+- kind: Subscription
+  properties:
+  - name: __key__
+    direction: desc
=======================================
--- /trunk/hub/main.py  Sun Jul 11 22:57:08 2010
+++ /trunk/hub/main.py  Wed Sep 22 15:16:49 2010
@@ -119,6 +119,9 @@
 import fork_join_queue
 import urlfetch_async

+import mapreduce.control
+import mapreduce.model
+
 async_proxy = async_apiproxy.AsyncAPIProxy()

################################################################################
@@ -168,17 +171,17 @@
 # How far before expiration to refresh subscriptions.
 SUBSCRIPTION_CHECK_BUFFER_SECONDS = (24 * 60 * 60)  # 24 hours

-# How many subscriber checking tasks to scheudle at a time.
-SUBSCRIPTION_CHECK_CHUNK_SIZE = 200
+# How many mapper shards to use for reconfirming subscriptions.
+SUBSCRIPTION_RECONFIRM_SHARD_COUNT = 4

 # How often to poll feeds.
 POLLING_BOOTSTRAP_PERIOD = 10800  # in seconds; 3 hours

 # Default expiration time of a lease.
-DEFAULT_LEASE_SECONDS = (30 * 24 * 60 * 60)  # 30 days
+DEFAULT_LEASE_SECONDS = (5 * 24 * 60 * 60)  # 5 days

 # Maximum expiration time of a lease.
-MAX_LEASE_SECONDS = DEFAULT_LEASE_SECONDS * 3  # 90 days
+MAX_LEASE_SECONDS = (10 * 24 * 60 * 60)  # 10 days

 # Maximum number of redirects to follow when feed fetching.
 MAX_REDIRECTS = 7
@@ -2111,62 +2114,41 @@
 class SubscriptionReconfirmHandler(webapp.RequestHandler):
"""Periodic handler causes reconfirmation for almost expired subscriptions."""

-  def __init__(self, now=time.time):
+  def __init__(self, now=time.time, start_map=mapreduce.control.start_map):
     """Initializer."""
     webapp.RequestHandler.__init__(self)
     self.now = now
+    self.start_map = start_map

   @work_queue_only
   def get(self):
-    threshold_timestamp = str(
-        int(self.now() - SUBSCRIPTION_CHECK_BUFFER_SECONDS))
- # NOTE: See PollBootstrapHandler as to why we need a named task here for
-    # the first insertion and the rest of the sequence.
-    name = 'reconfirm-' + threshold_timestamp
+ # Use the name, such that only one of these tasks runs per calendar day. + name = 'reconfirm-%s' % time.strftime('%Y-%m-%d' , time.gmtime(self.now()))
     try:
       taskqueue.Task(
           url='/work/reconfirm_subscriptions',
-          name=name,
-          params=dict(time_offset=threshold_timestamp)
+          name=name
       ).add(POLLING_QUEUE)
except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
-      logging.exception('Could not enqueue FIRST reconfirmation task')
+      logging.exception('Could not enqueue FIRST reconfirmation task; '
+                        'must have already run today.')

   @work_queue_only
   def post(self):
-    time_offset = self.request.get('time_offset')
-    datetime_offset = datetime.datetime.utcfromtimestamp(int(time_offset))
-    key_offset = self.request.get('key_offset')
-    logging.info('Handling reconfirmations for time_offset = %s, '
-                 'current_key = %s', datetime_offset, key_offset)
-
-    query = (Subscription.all()
-             .filter('subscription_state =', Subscription.STATE_VERIFIED)
-             .order('__key__'))
-    if key_offset:
-      query.filter('__key__ >', db.Key(key_offset))
-
-    subscriptions = query.fetch(SUBSCRIPTION_CHECK_CHUNK_SIZE)
-    if not subscriptions:
-      logging.info('All done with periodic subscription reconfirmations')
-      return
-
-    next_key = str(subscriptions[-1].key())
-    try:
-      taskqueue.Task(
-          url='/work/reconfirm_subscriptions',
-          name='reconfirm-%s-%s' % (time_offset, sha1_hash(next_key)),
-          params=dict(time_offset=time_offset,
-                      key_offset=next_key)).add(POLLING_QUEUE)
- except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
-      logging.exception('Continued polling task already present; '
-                        'this work has already been done')
-      return
-
-    for sub in subscriptions:
-      if sub.expiration_time < datetime_offset:
-        sub.request_insert(sub.callback, sub.topic, sub.verify_token,
-                           sub.secret, auto_reconfirm=True)
+    self.start_map(
+        name='Reconfirm expiring subscriptions',
+        handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
+        reader_spec='mapreduce.input_readers.DatastoreInputReader',
+        reader_parameters=dict(
+            processing_rate=100000,
+            entity_kind='main.Subscription'),
+        shard_count=SUBSCRIPTION_RECONFIRM_SHARD_COUNT,
+        queue_name=POLLING_QUEUE,
+        mapreduce_parameters=dict(
+          threshold_timestamp=int(
+              self.now() + SUBSCRIPTION_CHECK_BUFFER_SECONDS),
+          done_callback='/work/cleanup_mapper',
+          done_callback_queue=POLLING_QUEUE))


 class SubscriptionCleanupHandler(webapp.RequestHandler):
@@ -2184,6 +2166,19 @@
except (db.Error, apiproxy_errors.Error, runtime.DeadlineExceededError):
         logging.exception('Could not clean-up Subscription instances')

+
+class CleanupMapperHandler(webapp.RequestHandler):
+  """Cleans up all data from a Mapper job run."""
+
+  @work_queue_only
+  def post(self):
+    mapreduce_id = self.request.headers.get('mapreduce-id')
+    # TODO: Use Mapper Cleanup API once available.
+ db.delete(mapreduce.model.MapreduceControl.get_key_by_job_id(mapreduce_id))
+    shards = mapreduce.model.ShardState.find_by_mapreduce_id(mapreduce_id)
+    db.delete(shards)
+ db.delete(mapreduce.model.MapreduceState.get_key_by_job_id(mapreduce_id))
+
################################################################################
 # Publishing handlers

@@ -2411,7 +2406,6 @@
              getattr(response, 'headers', None),
              getattr(response, 'content', None),
              exception)
-
   urlfetch_async.fetch(fetch_url,
                        headers=headers,
                        follow_redirects=False,
@@ -3445,7 +3439,8 @@
       (r'/work/poll_bootstrap', PollBootstrapHandler),
       (r'/work/event_cleanup', EventCleanupHandler),
       (r'/work/subscription_cleanup', SubscriptionCleanupHandler),
-      (r'/work/reconfirm_subscriptions', SubscriptionReconfirmHandler)
+      (r'/work/reconfirm_subscriptions', SubscriptionReconfirmHandler),
+      (r'/work/cleanup_mapper', CleanupMapperHandler),
     ])
   application = webapp.WSGIApplication(HANDLERS, debug=DEBUG)
   wsgiref.handlers.CGIHandler().run(application)
=======================================
--- /trunk/hub/main_test.py     Sun Jul 11 22:24:18 2010
+++ /trunk/hub/main_test.py     Wed Sep 22 15:16:49 2010
@@ -45,6 +45,9 @@
 import main
 import urlfetch_test_stub

+import mapreduce.control
+import mapreduce.model
+
################################################################################
 # For convenience

@@ -2719,7 +2722,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')

   def tearDown(self):
     """Tears down the test harness."""
@@ -2986,7 +2989,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
         '&hub.mode=%s'
-        '&hub.lease_seconds=7776000')
+        '&hub.lease_seconds=864000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3014,7 +3017,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3147,7 +3150,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic%%2FCaSeSeNsItIvE'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3180,7 +3183,7 @@
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
           '%%2F%%7Eone%%3Atwo%%2F%%26%%3D'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3220,7 +3223,7 @@
             'blah%%2F%%25E3%%2583%%2596%%25E3%%2583%%25AD'
             '%%25E3%%2582%%25B0%%25E8%%25A1%%2586'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3267,7 +3270,7 @@
             'blah%%2F%%25E3%%2583%%2596%%25E3%%2583%%25AD'
             '%%25E3%%2582%%25B0%%25E8%%25A1%%2586'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')
     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
         self.challenge)
@@ -3314,7 +3317,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')

   def tearDown(self):
     """Verify that all URL fetches occurred."""
@@ -3413,7 +3416,7 @@
         '&hub.challenge=this_is_my_fake_challenge_string'
         '&hub.topic=http%%3A%%2F%%2Fexample.com%%2Fthe-topic'
         '&hub.mode=%s'
-        '&hub.lease_seconds=2592000')
+        '&hub.lease_seconds=432000')

     urlfetch_test_stub.instance.expect(
'get', self.verify_callback_querystring_template % 'subscribe', 200,
@@ -3620,95 +3623,45 @@
 class SubscriptionReconfirmHandlerTest(testutil.HandlerTestBase):
   """Tests for the periodic subscription reconfirming worker."""

-  def setUp(self):
-    """Sets up the test harness."""
+  def testFullFlow(self):
+    """Tests a full flow through the reconfirm worker."""
     self.now = time.time()
-    self.now_datetime = datetime.datetime.utcfromtimestamp(self.now)
-    self.confirm_time = self.now - main.SUBSCRIPTION_CHECK_BUFFER_SECONDS
+    self.called = False
+    def start_map(*args, **kwargs):
+      self.assertEquals(kwargs, {
+          'name': 'Reconfirm expiring subscriptions',
+          'reader_spec': 'mapreduce.input_readers.DatastoreInputReader',
+          'queue_name': 'polling',
+          'handler_spec': 'offline_jobs.SubscriptionReconfirmMapper.run',
+          'shard_count': 4,
+          'reader_parameters': {
+            'entity_kind': 'main.Subscription',
+            'processing_rate': 100000
+          },
+          'mapreduce_parameters': {
+            'done_callback': '/work/cleanup_mapper',
+            'done_callback_queue': 'polling',
+            'threshold_timestamp':
+                int(self.now + main.SUBSCRIPTION_CHECK_BUFFER_SECONDS)
+          },
+      })
+      self.called = True
+
     def create_handler():
-      return main.SubscriptionReconfirmHandler(now=lambda: self.now)
+      return main.SubscriptionReconfirmHandler(
+          now=lambda: self.now,
+          start_map=start_map)
     self.handler_class = create_handler
-    testutil.HandlerTestBase.setUp(self)
-    self.original_chunk_size = main.SUBSCRIPTION_CHECK_CHUNK_SIZE
-    main.SUBSCRIPTION_CHECK_CHUNK_SIZE = 2
+
     os.environ['HTTP_X_APPENGINE_QUEUENAME'] = main.POLLING_QUEUE
-
-  def tearDown(self):
-    """Tears down the test harness."""
-    testutil.HandlerTestBase.tearDown(self)
-    main.SUBSCRIPTION_CHECK_CHUNK_SIZE = self.original_chunk_size
-    del os.environ['HTTP_X_APPENGINE_QUEUENAME']
-
-  def testFullFlow(self):
- """Tests a full flow through multiple chunks of the reconfirm worker."""
-    topic = 'http://example.com/topic'
-    # Funny endings to maintain alphabetical order with hashes of callback
-    # URL and topic URL.
-    callback = 'http://example.com/callback1-ad'
-    callback2 = 'http://example.com/callback2-b'
-    callback3 = 'http://example.com/callback3-d'
-    callback4 = 'http://example.com/callback4-a'
-    token = 'my token'
-    secret = 'my secret'
-    lease_seconds = -main.SUBSCRIPTION_CHECK_BUFFER_SECONDS - 1
-    now = lambda: self.now_datetime
-
-    self.handle('get')
- task = testutil.get_tasks(main.POLLING_QUEUE, index=0, expected_count=1)
-    time_offset = task['params']['time_offset']
-
- # There will be four Subscriptions instances, three of which will actually
-    # be affected by this check.
-    Subscription.insert(callback, topic, token, secret,
-                        lease_seconds=lease_seconds, now=now)
-    Subscription.insert(callback2, topic, token, secret,
-                        lease_seconds=lease_seconds, now=now)
-    Subscription.insert(callback3, topic, token, secret,
- lease_seconds=2*main.SUBSCRIPTION_CHECK_BUFFER_SECONDS,
-                        now=now)
-    Subscription.insert(callback4, topic, token, secret,
-                        lease_seconds=lease_seconds, now=now)
-
-    all_subs = list(Subscription.all())
-    confirm_tasks = []
-
- # Now run the post handler with the params from the first task. This will
-    # enqueue another task that takes on the second chunk of work and also
-    # will enqueue tasks to confirm subscriptions.
-    self.handle('post', *task['params'].items())
-    confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=2))
-    confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=3))
-
- # Run another post handler, which will pick up the remaining subscription
-    # confirmation and finish the work effort. Properly handle a race
-    # condition where Subscription tasks may be inserted with an ETA before
-    # the continuation task.
-    all_tasks = testutil.get_tasks(main.POLLING_QUEUE, expected_count=4)
-    task = [a for a in all_tasks[1:] if 'time_offset' in a['params']][0]
-
-    # Run this task twice; the second time should do nothing.
-    self.handle('post', *task['params'].items())
-    self.handle('post', *task['params'].items())
-
-    # Last task will find no more work to do.
- task = testutil.get_tasks(main.POLLING_QUEUE, index=4, expected_count=6)
-    if 'time_offset' not in task['params']:
-      logging.info("FAIL!")
-      task = testutil.get_tasks(main.POLLING_QUEUE, index=5)
-      confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=4))
-    else:
-      confirm_tasks.append(testutil.get_tasks(main.POLLING_QUEUE, index=5))
-
-    self.handle('post', *task['params'].items())
-    testutil.get_tasks(main.POLLING_QUEUE, expected_count=6)
-
-    # Verify all confirmation tasks.
-    self.assertEquals(callback3, all_subs[2].callback)
-    del all_subs[2]
-    confirm_key_names = [s.key().name() for s in all_subs]
-    found_key_names = [
-        t['params']['subscription_key_name'] for t in confirm_tasks]
-    self.assertEquals(confirm_key_names, found_key_names)
+    try:
+      self.handle('get')
+ task = testutil.get_tasks(main.POLLING_QUEUE, index=0, expected_count=1)
+      self.handle('post')
+    finally:
+      del os.environ['HTTP_X_APPENGINE_QUEUENAME']
+
+    self.assertTrue(self.called)


 class SubscriptionCleanupHandlerTest(testutil.HandlerTestBase):
@@ -3735,6 +3688,40 @@
     self.assertEquals(2 * [Subscription.STATE_VERIFIED],
                       [s.subscription_state for s in Subscription.all()])

+
+class CleanupMapperHandlerTest(testutil.HandlerTestBase):
+  """Tests for the CleanupMapperHandler."""
+
+  handler_class = main.CleanupMapperHandler
+
+  def testMissing(self):
+    """Tests cleaning up a mapreduce that's not present."""
+    self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+    os.environ['HTTP_MAPREDUCE_ID'] = '12345'
+    try:
+      self.handle('post')
+    finally:
+      del os.environ['HTTP_MAPREDUCE_ID']
+    self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+
+  def testPresent(self):
+    """Tests cleaning up a mapreduce that's present."""
+    mapreduce_id = mapreduce.control.start_map(
+        name='Reconfirm expiring subscriptions',
+        handler_spec='offline_jobs.SubscriptionReconfirmMapper.run',
+        reader_spec='mapreduce.input_readers.DatastoreInputReader',
+        reader_parameters=dict(
+            processing_rate=100000,
+            entity_kind='main.Subscription'))
+
+    self.assertEquals(1, len(list(mapreduce.model.MapreduceState.all())))
+    os.environ['HTTP_MAPREDUCE_ID'] = mapreduce_id
+    try:
+      self.handle('post')
+    finally:
+      del os.environ['HTTP_MAPREDUCE_ID']
+    self.assertEquals([], list(mapreduce.model.MapreduceState.all()))
+
################################################################################

 PollingMarker = main.PollingMarker
=======================================
--- /trunk/hub/mapreduce.yaml   Sat Jun  5 18:18:50 2010
+++ /trunk/hub/mapreduce.yaml   Wed Sep 22 15:16:49 2010
@@ -24,3 +24,15 @@
     - name: age_days
       default: 14
     params_validator: offline_jobs.CleanupOldEventToDeliver.validate_params
+- name: Reconfirm expiring subscriptions
+  mapper:
+    input_reader: mapreduce.input_readers.DatastoreInputReader
+    handler: offline_jobs.SubscriptionReconfirmMapper.run
+    params:
+    - name: entity_kind
+      default: main.Subscription
+    - name: shard_count
+      default: 32
+    - name: processing_rate
+      default: 100000
+ params_validator: offline_jobs.SubscriptionReconfirmMapper.validate_params
=======================================
--- /trunk/hub/offline_jobs.py  Sat Jun  5 18:18:50 2010
+++ /trunk/hub/offline_jobs.py  Wed Sep 22 15:16:49 2010
@@ -15,7 +15,7 @@
 # limitations under the License.
 #

-"""Offline analysis jobs used with the hub."""
+"""Offline cleanup and analysis jobs used with the hub."""

 import datetime
 import logging
@@ -23,6 +23,8 @@

 from google.appengine.ext import db

+import main
+
 from mapreduce import context
 from mapreduce import operation as op

@@ -58,3 +60,27 @@

     if event.last_modified < self.oldest_last_modified:
       yield op.db.Delete(event)
+
+
+class SubscriptionReconfirmMapper(object):
+  """For reconfirming subscriptions that are nearing expiration."""
+
+  @staticmethod
+  def validate_params(params):
+    assert 'threshold_timestamp' in params
+
+  def __init__(self):
+    self.threshold_timestamp = None
+
+  def run(self, sub):
+    if sub.subscription_state != main.Subscription.STATE_VERIFIED:
+      return
+
+    if self.threshold_timestamp is None:
+      params = context.get().mapreduce_spec.params
+      self.threshold_timestamp = datetime.datetime.utcfromtimestamp(
+          params['threshold_timestamp'])
+
+    if sub.expiration_time < self.threshold_timestamp:
+      sub.request_insert(sub.callback, sub.topic, sub.verify_token,
+                         sub.secret, auto_reconfirm=True)

Reply via email to