Revision: 353
Author: bslatkin
Date: Thu Apr 29 17:01:08 2010
Log: hub task insertion transactional
http://code.google.com/p/pubsubhubbub/source/detail?r=353

Modified:
 /trunk/hub/main.py
 /trunk/hub/main_test.py

=======================================
--- /trunk/hub/main.py  Mon Mar 15 23:00:57 2010
+++ /trunk/hub/main.py  Thu Apr 29 17:01:08 2010
@@ -684,21 +684,6 @@
   return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))


-def retry_datastore_op(func, retries=5):
-  """Executes the given function and retries it on Datastore errors.
-
-  Args:
-    func: Callable to do the datastore operation.
-    retries: How many times to retry the operation.
-  """
-  for i in xrange(retries):
-    try:
-      return func()
-    except db.Error:
- logging.exception('Error running Datastore operation, attempt %d', i+1)
-      if i == (retries-1):
-        raise
-
################################################################################
 # Models

@@ -811,9 +796,9 @@
"""Records that a callback URL needs verification before being subscribed.

Creates a new subscription request (for asynchronous verification) if None
-    already exists. Any existing subscription request will not be modified;
+    already exists. Any existing subscription request will be overridden;
     for instance, if a subscription has already been verified, this method
-    will do nothing.
+    will cause it to be reconfirmed.

     Args:
       callback: URL that will receive callbacks.
@@ -852,13 +837,12 @@
                       now() + datetime.timedelta(seconds=lease_seconds)))
       sub.confirm_failures = 0
       sub.put()
-      return (sub_is_new, sub)
-    new, sub = db.run_in_transaction(txn)
- # Note: This enqueuing must come *after* the transaction is submitted, or - # else we'll actually run the task *before* the transaction is submitted.
-    sub.enqueue_task(cls.STATE_VERIFIED, verify_token, secret=secret,
-                     auto_reconfirm=auto_reconfirm)
-    return new
+      sub.enqueue_task(cls.STATE_VERIFIED,
+                       verify_token,
+                       secret=secret,
+                       auto_reconfirm=auto_reconfirm)
+      return sub_is_new
+    return db.run_in_transaction(txn)

   @classmethod
   def remove(cls, callback, topic):
@@ -906,14 +890,11 @@
       if sub is not None:
         sub.confirm_failures = 0
         sub.put()
-        return (True, sub)
-      return (False, sub)
-    removed, sub = db.run_in_transaction(txn)
- # Note: This enqueuing must come *after* the transaction is submitted, or - # else we'll actually run the task *before* the transaction is submitted.
-    if sub:
-      sub.enqueue_task(cls.STATE_TO_DELETE, verify_token)
-    return removed
+        sub.enqueue_task(cls.STATE_TO_DELETE, verify_token)
+        return True
+      else:
+        return False
+    return db.run_in_transaction(txn)

   @classmethod
   def archive(cls, callback, topic):
@@ -990,7 +971,6 @@
         The new secret to use for this subscription after successful
         confirmation.
     """
- # TODO(bslatkin): Remove these retries when they're not needed in userland.
     RETRIES = 3
     if os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
       target_queue = POLLING_QUEUE
@@ -1006,7 +986,7 @@
                     'verify_token': verify_token,
                     'secret': secret or '',
                     'auto_reconfirm': str(auto_reconfirm)}
-            ).add(target_queue)
+            ).add(target_queue, transactional=True)
       except (taskqueue.Error, apiproxy_errors.Error):
         logging.exception('Could not insert task to confirm '
                           'topic = %s, callback = %s',
@@ -1044,19 +1024,21 @@
True if this Subscription confirmation should be retried again. Returns
       False if we should give up and never try again.
     """
-    if self.confirm_failures >= max_failures:
-      logging.warning('Max subscription failures exceeded, giving up.')
-      return False
-    else:
-      retry_delay = retry_period * (2 ** self.confirm_failures)
-      self.eta = now() + datetime.timedelta(seconds=retry_delay)
-      self.confirm_failures += 1
-      retry_datastore_op(lambda: self.put())
-      # TODO(bslatkin): Do this enqueuing transactionally.
-      self.enqueue_task(next_state, verify_token,
+    def txn():
+      if self.confirm_failures >= max_failures:
+        logging.warning('Max subscription failures exceeded, giving up.')
+        return False
+      else:
+        retry_delay = retry_period * (2 ** self.confirm_failures)
+        self.eta = now() + datetime.timedelta(seconds=retry_delay)
+        self.confirm_failures += 1
+      self.put()
+      self.enqueue_task(next_state,
+                        verify_token,
                         auto_reconfirm=auto_reconfirm,
                         secret=secret)
       return True
+    return db.run_in_transaction(txn)


 class FeedToFetch(db.Model):
@@ -1067,11 +1049,11 @@
   """

   topic = db.TextProperty(required=True)
-  eta = db.DateTimeProperty(auto_now_add=True)
-  fetching_failures = db.IntegerProperty(default=0)
+  eta = db.DateTimeProperty(auto_now_add=True, indexed=False)
+  fetching_failures = db.IntegerProperty(default=0, indexed=False)
   totally_failed = db.BooleanProperty(default=False)
-  source_keys = db.StringListProperty()
-  source_values = db.StringListProperty()
+  source_keys = db.StringListProperty(indexed=False)
+  source_values = db.StringListProperty(indexed=False)

# TODO(bslatkin): Add fetching failure reason (urlfetch, parsing, etc) and
   # surface it on the topic details page.
@@ -1113,11 +1095,8 @@
             source_keys=list(source_keys),
             source_values=list(source_values))
         for topic in set(topic_list)]
-    retry_datastore_op(lambda: db.put(feed_list))
- # TODO(bslatkin): Use a bulk interface or somehow merge combined fetches
-    # into a single task.
-    for feed in feed_list:
-      feed._enqueue_task()
+    db.put(feed_list)
+    FeedToFetch._enqueue_tasks(feed_list)

   def fetch_failed(self,
                    max_failures=MAX_FEED_PULL_FAILURES,
@@ -1133,18 +1112,18 @@
       retry_period: Initial period for doing exponential (base-2) backoff.
       now: Returns the current time as a UTC datetime.
     """
-    if self.fetching_failures >= max_failures:
-      logging.warning('Max fetching failures exceeded, giving up.')
-      self.totally_failed = True
-      retry_datastore_op(lambda: self.put())
-    else:
-      retry_delay = retry_period * (2 ** self.fetching_failures)
- logging.warning('Fetching failed. Will retry in %s seconds', retry_delay)
-      self.eta = now() + datetime.timedelta(seconds=retry_delay)
-      self.fetching_failures += 1
-      retry_datastore_op(lambda: self.put())
-      # TODO(bslatkin): Do this enqueuing transactionally.
-      self._enqueue_task()
+    def txn():
+      if self.fetching_failures >= max_failures:
+        logging.warning('Max fetching failures exceeded, giving up.')
+        self.totally_failed = True
+      else:
+        retry_delay = retry_period * (2 ** self.fetching_failures)
+ logging.warning('Fetching failed. Will retry in %s seconds', retry_delay)
+        self.eta = now() + datetime.timedelta(seconds=retry_delay)
+        self.fetching_failures += 1
+        FeedToFetch._enqueue_tasks([self])
+      self.put()
+    db.run_in_transaction(txn)

   def done(self):
     """The feed fetch has completed successfully.
@@ -1166,30 +1145,41 @@
         return False
     return db.run_in_transaction(txn)

-  def _enqueue_task(self):
-    """Enqueues a task to fetch this feed."""
- # TODO(bslatkin): Remove these retries when they're not needed in userland.
-    RETRIES = 3
-    if self.fetching_failures > 0:
-      target_queue = FEED_RETRIES_QUEUE
-    elif os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
-      target_queue = POLLING_QUEUE
-    else:
-      target_queue = FEED_QUEUE
-    for i in xrange(RETRIES):
-      try:
-        taskqueue.Task(
-            url='/work/pull_feeds',
-            eta=self.eta,
-            params={'topic': self.topic}
-            ).add(target_queue)
-      except (taskqueue.Error, apiproxy_errors.Error):
-        logging.exception('Could not insert task to fetch topic = %s',
-                          self.topic)
-        if i == (RETRIES - 1):
-          raise
+  def _get_task(self):
+    """Creates a task that will fetch this feed."""
+    return taskqueue.Task(
+        url='/work/pull_feeds',
+        eta=self.eta,
+        params={'topic': self.topic})
+
+  @staticmethod
+  def _enqueue_tasks(feed_list, transactional=False):
+    """Enqueues a task to fetch the given feeds."""
+    task_dict = {}
+    for feed in feed_list:
+      if feed.fetching_failures > 0:
+        target_queue = FEED_RETRIES_QUEUE
+      elif os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
+        target_queue = POLLING_QUEUE
       else:
-        return
+        target_queue = FEED_QUEUE
+      task_dict.setdefault(target_queue, []).append(feed)
+
+    RETRIES = 3
+    for queue_name, feed_list in task_dict.iteritems():
+      for i in xrange(RETRIES):
+        try:
+          taskqueue.Queue(queue_name).add(
+              [f._get_task() for f in feed_list],
+              transactional=transactional)
+        except (taskqueue.Error, apiproxy_errors.Error):
+          logging.exception('Could not insert tasks on queue = %r '
+                            'to fetch topics = %r',
+                            [f.topic for f in feed_list])
+          if i == (RETRIES - 1):
+            raise
+        else:
+          return


 class FeedRecord(db.Model):
@@ -1507,7 +1497,7 @@
     if not more_callbacks and not self.failed_callbacks:
logging.info('EventToDeliver complete: topic = %s, delivery_mode = %s',
                    self.topic, self.delivery_mode)
-      retry_datastore_op(lambda: self.delete())
+      self.delete()
       return
     elif not more_callbacks:
       self.last_callback = ''
@@ -1528,14 +1518,14 @@
                         len(self.failed_callbacks), self.last_modified,
                         self.totally_failed)

-    retry_datastore_op(lambda: self.put())
-    if not self.totally_failed:
-      # TODO(bslatkin): Do this enqueuing transactionally.
-      self.enqueue()
+    def txn():
+      self.put()
+      if not self.totally_failed:
+        self.enqueue()
+    db.run_in_transaction(txn)

   def enqueue(self):
     """Enqueues a Task that will execute this EventToDeliver."""
- # TODO(bslatkin): Remove these retries when they're not needed in userland.
     RETRIES = 3
     if self.delivery_mode == EventToDeliver.RETRY:
       target_queue = EVENT_RETRIES_QUEUE
@@ -1549,7 +1539,7 @@
             url='/work/push_events',
             eta=self.last_modified,
             params={'event_key': self.key()}
-            ).add(target_queue)
+            ).add(target_queue, transactional=True)
       except (taskqueue.Error, apiproxy_errors.Error):
         logging.exception('Could not insert task to deliver '
                           'events for topic = %s', self.topic)
@@ -2186,7 +2176,7 @@
   def get(self):
     self.response.out.write(template.render('publish_debug.html', {}))

-  @dos.limit(count=100, period=1)  # TODO(bslatkin): need whitelist
+  @dos.limit(count=100, period=1)
   def post(self):
     self.response.headers['Content-Type'] = 'text/plain'

@@ -2412,6 +2402,8 @@
         all_entities.insert(0, group[len(group)/2:])
         all_entities.insert(0, group[:len(group)/2])
         raise
+    if event_to_deliver:
+      event_to_deliver.enqueue()

   for i in xrange(PUT_SPLITTING_ATTEMPTS):
     try:
@@ -2424,11 +2416,6 @@
'request size; dropping event for %s', feed_record.topic)
     return True

-  # TODO(bslatkin): Make this transactional with the call to work.done()
-  # that happens in the PullFeedHandler.post() method.
-  if event_to_deliver:
-    event_to_deliver.enqueue()
-
   # Inform any hooks that there will is a new event to deliver that has
   # been recorded and delivery has begun.
   hooks.execute(inform_event, event_to_deliver)
@@ -2614,7 +2601,7 @@
         continue

       headers = {
-        # TODO(bslatkin): Remove the 'or' here once migration is done.
+        # In case there was no content type header.
         'Content-Type': work.content_type or 'text/xml',
         # TODO(bslatkin): add a better test for verify_token here.
         'X-Hub-Signature': 'sha1=%s' % sha1_hmac(
@@ -2751,7 +2738,7 @@
       current_key = None

################################################################################
-# feed canonicalization
+# Feed canonicalization

 class RecordFeedHandler(webapp.RequestHandler):
"""Background worker for categorizing/classifying feed URLs by their ID."""
@@ -2786,14 +2773,14 @@
       response = urlfetch.fetch(topic)
     except (apiproxy_errors.Error, urlfetch.Error):
       logging.exception('Could not fetch topic = %s for feed ID', topic)
-      retry_datastore_op(lambda: known_feed.put())
+      known_feed.put()
       return

     # TODO(bslatkin): Add more intelligent retrying of feed identification.
     if response.status_code != 200:
logging.warning('Fetching topic = %s for feed ID returned response %s',
                       topic, response.status_code)
-      retry_datastore_op(lambda: known_feed.put())
+      known_feed.put()
       return

     order = (ATOM, RSS)
@@ -2816,7 +2803,7 @@
     if parse_failures == len(order) or feed_id is None:
       logging.warning('Could not record feed ID for topic = %s:\n%s',
                       topic, error_traceback)
-      retry_datastore_op(lambda: known_feed.put())
+      known_feed.put()
       # Just give up, since we can't parse it. This case also covers when
       # the character encoding for the document is unsupported.
       return
@@ -2831,7 +2818,7 @@

     KnownFeedIdentity.update(feed_id, topic)
     known_feed.feed_id = feed_id
-    retry_datastore_op(lambda: known_feed.put())
+    known_feed.put()

################################################################################

=======================================
--- /trunk/hub/main_test.py     Fri Feb 26 11:54:31 2010
+++ /trunk/hub/main_test.py     Thu Apr 29 17:01:08 2010
@@ -109,17 +109,6 @@
            u'/07256788297315478906/label/\u30d6\u30ed\u30b0\u8846')
     self.assertEquals(good_iri, main.normalize_iri(iri))

-  def testRetryDatastoreOp(self):
-    """Tests the retry_datastore_op function."""
-    tries = [0]
-    def my_func():
-      tries[0] += 1
-      raise db.Error('Doh')
-    self.assertRaises(db.Error,
-                      main.retry_datastore_op,
-                      my_func)
-    self.assertEquals(5, tries[0])
-
################################################################################

 class TestWorkQueueHandler(webapp.RequestHandler):
@@ -1130,11 +1119,14 @@
   def testQueuePreserved(self):
"""Tests that enqueueing an EventToDeliver preserves the polling queue."""
     event, work_key, sub_list, sub_keys = self.insert_subscriptions()
-    event.enqueue()
+    def txn():
+      event.enqueue()
+    db.run_in_transaction(txn)
+
     testutil.get_tasks(main.EVENT_QUEUE, expected_count=1)
     os.environ['HTTP_X_APPENGINE_QUEUENAME'] = main.POLLING_QUEUE
     try:
-      event.enqueue()
+      db.run_in_transaction(txn)
     finally:
       del os.environ['HTTP_X_APPENGINE_QUEUENAME']

Reply via email to