Revision: 323
Author: bslatkin
Date: Wed Feb  3 10:26:36 2010
Log: more hub hardening; bad url redirects, datastore retries, encoding errors
http://code.google.com/p/pubsubhubbub/source/detail?r=323

Modified:
 /trunk/hub/main.py
 /trunk/hub/main_test.py

=======================================
--- /trunk/hub/main.py  Wed Feb  3 09:48:25 2010
+++ /trunk/hub/main.py  Wed Feb  3 10:26:36 2010
@@ -339,6 +339,22 @@
   """Returns a string containing a random challenge token."""
   return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))

+
+def retry_datastore_op(func, retries=5):
+  """Executes the given function and retries it on Datastore errors.
+
+  Args:
+    func: Callable to do the datastore operation.
+    retries: How many times to retry the operation.
+  """
+  for i in xrange(retries):
+    try:
+      return func()
+    except db.Error:
+ logging.exception('Error running Datastore operation, attempt %d', i+1)
+      if i == (retries-1):
+        raise
+
################################################################################
 # Models

@@ -691,7 +707,7 @@
       retry_delay = retry_period * (2 ** self.confirm_failures)
       self.eta = now() + datetime.timedelta(seconds=retry_delay)
       self.confirm_failures += 1
-      self.put()
+      retry_datastore_op(lambda: self.put())
       # TODO(bslatkin): Do this enqueuing transactionally.
       self.enqueue_task(next_state, verify_token,
                         auto_reconfirm=auto_reconfirm,
@@ -753,7 +769,7 @@
             source_keys=list(source_keys),
             source_values=list(source_values))
         for topic in set(topic_list)]
-    db.put(feed_list)
+    retry_datastore_op(lambda: db.put(feed_list))
# TODO(bslatkin): Use a bulk interface or somehow merge combined fetches
     # into a single task.
     for feed in feed_list:
@@ -776,13 +792,13 @@
     if self.fetching_failures >= max_failures:
       logging.warning('Max fetching failures exceeded, giving up.')
       self.totally_failed = True
-      self.put()
+      retry_datastore_op(lambda: self.put())
     else:
       retry_delay = retry_period * (2 ** self.fetching_failures)
logging.warning('Fetching failed. Will retry in %s seconds', retry_delay)
       self.eta = now() + datetime.timedelta(seconds=retry_delay)
       self.fetching_failures += 1
-      self.put()
+      retry_datastore_op(lambda: self.put())
       # TODO(bslatkin): Do this enqueuing transactionally.
       self._enqueue_task()

@@ -1150,7 +1166,7 @@
     if not more_callbacks and not self.failed_callbacks:
logging.info('EventToDeliver complete: topic = %s, delivery_mode = %s',
                    self.topic, self.delivery_mode)
-      self.delete()
+      retry_datastore_op(lambda: self.delete())
       return
     elif not more_callbacks:
       self.last_callback = ''
@@ -1171,7 +1187,7 @@
                         len(self.failed_callbacks), self.last_modified,
                         self.totally_failed)

-    self.put()
+    retry_datastore_op(lambda: self.put())
     if not self.totally_failed:
       # TODO(bslatkin): Do this enqueuing transactionally.
       self.enqueue()
@@ -1985,6 +2001,13 @@
'Could not get entries for content of %d bytes in format "%s":\n%s',
           len(content), format, error_traceback)
       parse_failures += 1
+    except LookupError, e:
+      error_traceback = traceback.format_exc()
+      logging.warning('Could not decode encoding of feed document %s\n%s',
+                      feed_record.topic, error_traceback)
+ # Yes-- returning True here. This feed is beyond all hope because we just
+      # don't support this character encoding presently.
+      return True

   if parse_failures == len(order):
     logging.error('Could not parse feed; giving up:\n%s', error_traceback)
@@ -2103,6 +2126,11 @@
                          'skipping', work.topic, fetch_url)
         work.done()
         return
+      except urlfetch.InvalidURLError:
+        logging.critical('Invalid redirection for topic %s to url %s; '
+                         'skipping', work.topic, fetch_url)
+        work.done()
+        return
       except (apiproxy_errors.Error, urlfetch.Error):
         logging.exception('Failed to fetch feed')
         work.fetch_failed()
@@ -2362,14 +2390,14 @@
       response = urlfetch.fetch(topic)
     except (apiproxy_errors.Error, urlfetch.Error):
       logging.exception('Could not fetch topic = %s for feed ID', topic)
-      known_feed.put()
+      retry_datastore_op(lambda: known_feed.put())
       return

     # TODO(bslatkin): Add more intelligent retrying of feed identification.
     if response.status_code != 200:
logging.warning('Fetching topic = %s for feed ID returned response %s',
                       topic, response.status_code)
-      known_feed.put()
+      retry_datastore_op(lambda: known_feed.put())
       return

     order = (ATOM, RSS)
@@ -2392,7 +2420,9 @@
     if parse_failures == len(order) or feed_id is None:
       logging.warning('Could not record feed ID for topic = %s:\n%s',
                       topic, error_traceback)
-      known_feed.put()
+      retry_datastore_op(lambda: known_feed.put())
+      # Just give up, since we can't parse it. This case also covers when
+      # the character encoding for the document is unsupported.
       return

     logging.info('For topic = %s found new feed ID %r; old feed ID was %r',
@@ -2405,7 +2435,7 @@

     KnownFeedIdentity.update(feed_id, topic)
     known_feed.feed_id = feed_id
-    known_feed.put()
+    retry_datastore_op(lambda: known_feed.put())

################################################################################

=======================================
--- /trunk/hub/main_test.py     Wed Feb  3 09:48:25 2010
+++ /trunk/hub/main_test.py     Wed Feb  3 10:26:36 2010
@@ -109,6 +109,17 @@
            u'/07256788297315478906/label/\u30d6\u30ed\u30b0\u8846')
     self.assertEquals(good_iri, main.normalize_iri(iri))

+  def testRetryDatastoreOp(self):
+    """Tests the retry_datastore_op function."""
+    tries = [0]
+    def my_func():
+      tries[0] += 1
+      raise db.Error('Doh')
+    self.assertRaises(db.Error,
+                      main.retry_datastore_op,
+                      my_func)
+    self.assertEquals(5, tries[0])
+
################################################################################

 class TestWorkQueueHandler(webapp.RequestHandler):
@@ -1741,6 +1752,23 @@
tasks.extend(testutil.get_tasks(main.FEED_RETRIES_QUEUE, expected_count=1)) self.assertEquals([self.topic] * 2, [t['params']['topic'] for t in tasks])

+  def testRedirectToBadUrl(self):
+    """Tests when the redirect URL is bad."""
+    info = FeedRecord.get_or_create(self.topic)
+    info.update(self.headers)
+    info.put()
+    FeedToFetch.insert([self.topic])
+
+    real_topic = '/not/a/valid-redirect-location'
+    self.headers['Location'] = real_topic
+    urlfetch_test_stub.instance.expect(
+        'get', self.topic, 302, '',
+        response_headers=self.headers.copy())
+
+    self.handle('post', ('topic', self.topic))
+    self.assertTrue(EventToDeliver.all().get() is None)
+    testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
+
   def testPutSplitting(self):
     """Tests that put() calls for feed records are split when too large."""
     # Make the content way too big.
@@ -1918,6 +1946,20 @@
     feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
     self.assertTrue(feed is None)

+  def testPullBadEncoding(self):
+    """Tests when the content has a bad character encoding."""
+    data = ('<?xml version="1.0" encoding="x-windows-874"?>\n'
+            '<feed><my header="data"/>'
+            '<entry><id>1</id><updated>123</updated>wooh</entry></feed>')
+    topic = 'http://example.com/my-topic'
+    callback = 'http://example.com/my-subscriber'
+ self.assertTrue(Subscription.insert(callback, topic, 'token', 'secret'))
+    FeedToFetch.insert([topic])
+    urlfetch_test_stub.instance.expect('get', topic, 200, data)
+    self.handle('post', ('topic', topic))
+    feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
+    self.assertTrue(feed is None)
+
   def testPullGoodAtom(self):
     """Tests when the Atom XML can parse just fine."""
data = ('<?xml version="1.0" encoding="utf-8"?>\n<feed><my header="data"/>'

Reply via email to