Revision: 323
Author: bslatkin
Date: Wed Feb 3 10:26:36 2010
Log: more hub hardening; bad url redirects, datastore retries, encoding
errors
http://code.google.com/p/pubsubhubbub/source/detail?r=323
Modified:
/trunk/hub/main.py
/trunk/hub/main_test.py
=======================================
--- /trunk/hub/main.py Wed Feb 3 09:48:25 2010
+++ /trunk/hub/main.py Wed Feb 3 10:26:36 2010
@@ -339,6 +339,22 @@
"""Returns a string containing a random challenge token."""
return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))
+
+def retry_datastore_op(func, retries=5):
+ """Executes the given function and retries it on Datastore errors.
+
+ Args:
+ func: Callable to do the datastore operation.
+ retries: How many times to retry the operation.
+ """
+ for i in xrange(retries):
+ try:
+ return func()
+ except db.Error:
+ logging.exception('Error running Datastore operation, attempt %d',
i+1)
+ if i == (retries-1):
+ raise
+
################################################################################
# Models
@@ -691,7 +707,7 @@
retry_delay = retry_period * (2 ** self.confirm_failures)
self.eta = now() + datetime.timedelta(seconds=retry_delay)
self.confirm_failures += 1
- self.put()
+ retry_datastore_op(lambda: self.put())
# TODO(bslatkin): Do this enqueuing transactionally.
self.enqueue_task(next_state, verify_token,
auto_reconfirm=auto_reconfirm,
@@ -753,7 +769,7 @@
source_keys=list(source_keys),
source_values=list(source_values))
for topic in set(topic_list)]
- db.put(feed_list)
+ retry_datastore_op(lambda: db.put(feed_list))
# TODO(bslatkin): Use a bulk interface or somehow merge combined
fetches
# into a single task.
for feed in feed_list:
@@ -776,13 +792,13 @@
if self.fetching_failures >= max_failures:
logging.warning('Max fetching failures exceeded, giving up.')
self.totally_failed = True
- self.put()
+ retry_datastore_op(lambda: self.put())
else:
retry_delay = retry_period * (2 ** self.fetching_failures)
logging.warning('Fetching failed. Will retry in %s seconds',
retry_delay)
self.eta = now() + datetime.timedelta(seconds=retry_delay)
self.fetching_failures += 1
- self.put()
+ retry_datastore_op(lambda: self.put())
# TODO(bslatkin): Do this enqueuing transactionally.
self._enqueue_task()
@@ -1150,7 +1166,7 @@
if not more_callbacks and not self.failed_callbacks:
logging.info('EventToDeliver complete: topic = %s, delivery_mode
= %s',
self.topic, self.delivery_mode)
- self.delete()
+ retry_datastore_op(lambda: self.delete())
return
elif not more_callbacks:
self.last_callback = ''
@@ -1171,7 +1187,7 @@
len(self.failed_callbacks), self.last_modified,
self.totally_failed)
- self.put()
+ retry_datastore_op(lambda: self.put())
if not self.totally_failed:
# TODO(bslatkin): Do this enqueuing transactionally.
self.enqueue()
@@ -1985,6 +2001,13 @@
'Could not get entries for content of %d bytes in
format "%s":\n%s',
len(content), format, error_traceback)
parse_failures += 1
+ except LookupError, e:
+ error_traceback = traceback.format_exc()
+ logging.warning('Could not decode encoding of feed document %s\n%s',
+ feed_record.topic, error_traceback)
+ # Yes-- returning True here. This feed is beyond all hope because we
just
+ # don't support this character encoding presently.
+ return True
if parse_failures == len(order):
logging.error('Could not parse feed; giving up:\n%s', error_traceback)
@@ -2103,6 +2126,11 @@
'skipping', work.topic, fetch_url)
work.done()
return
+ except urlfetch.InvalidURLError:
+ logging.critical('Invalid redirection for topic %s to url %s; '
+ 'skipping', work.topic, fetch_url)
+ work.done()
+ return
except (apiproxy_errors.Error, urlfetch.Error):
logging.exception('Failed to fetch feed')
work.fetch_failed()
@@ -2362,14 +2390,14 @@
response = urlfetch.fetch(topic)
except (apiproxy_errors.Error, urlfetch.Error):
logging.exception('Could not fetch topic = %s for feed ID', topic)
- known_feed.put()
+ retry_datastore_op(lambda: known_feed.put())
return
# TODO(bslatkin): Add more intelligent retrying of feed identification.
if response.status_code != 200:
logging.warning('Fetching topic = %s for feed ID returned
response %s',
topic, response.status_code)
- known_feed.put()
+ retry_datastore_op(lambda: known_feed.put())
return
order = (ATOM, RSS)
@@ -2392,7 +2420,9 @@
if parse_failures == len(order) or feed_id is None:
logging.warning('Could not record feed ID for topic = %s:\n%s',
topic, error_traceback)
- known_feed.put()
+ retry_datastore_op(lambda: known_feed.put())
+ # Just give up, since we can't parse it. This case also covers when
+ # the character encoding for the document is unsupported.
return
logging.info('For topic = %s found new feed ID %r; old feed ID was %r',
@@ -2405,7 +2435,7 @@
KnownFeedIdentity.update(feed_id, topic)
known_feed.feed_id = feed_id
- known_feed.put()
+ retry_datastore_op(lambda: known_feed.put())
################################################################################
=======================================
--- /trunk/hub/main_test.py Wed Feb 3 09:48:25 2010
+++ /trunk/hub/main_test.py Wed Feb 3 10:26:36 2010
@@ -109,6 +109,17 @@
u'/07256788297315478906/label/\u30d6\u30ed\u30b0\u8846')
self.assertEquals(good_iri, main.normalize_iri(iri))
+ def testRetryDatastoreOp(self):
+ """Tests the retry_datastore_op function."""
+ tries = [0]
+ def my_func():
+ tries[0] += 1
+ raise db.Error('Doh')
+ self.assertRaises(db.Error,
+ main.retry_datastore_op,
+ my_func)
+ self.assertEquals(5, tries[0])
+
################################################################################
class TestWorkQueueHandler(webapp.RequestHandler):
@@ -1741,6 +1752,23 @@
tasks.extend(testutil.get_tasks(main.FEED_RETRIES_QUEUE,
expected_count=1))
self.assertEquals([self.topic] * 2, [t['params']['topic'] for t in
tasks])
+ def testRedirectToBadUrl(self):
+ """Tests when the redirect URL is bad."""
+ info = FeedRecord.get_or_create(self.topic)
+ info.update(self.headers)
+ info.put()
+ FeedToFetch.insert([self.topic])
+
+ real_topic = '/not/a/valid-redirect-location'
+ self.headers['Location'] = real_topic
+ urlfetch_test_stub.instance.expect(
+ 'get', self.topic, 302, '',
+ response_headers=self.headers.copy())
+
+ self.handle('post', ('topic', self.topic))
+ self.assertTrue(EventToDeliver.all().get() is None)
+ testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
+
def testPutSplitting(self):
"""Tests that put() calls for feed records are split when too large."""
# Make the content way too big.
@@ -1918,6 +1946,20 @@
feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
self.assertTrue(feed is None)
+ def testPullBadEncoding(self):
+ """Tests when the content has a bad character encoding."""
+ data = ('<?xml version="1.0" encoding="x-windows-874"?>\n'
+ '<feed><my header="data"/>'
+ '<entry><id>1</id><updated>123</updated>wooh</entry></feed>')
+ topic = 'http://example.com/my-topic'
+ callback = 'http://example.com/my-subscriber'
+ self.assertTrue(Subscription.insert(callback,
topic, 'token', 'secret'))
+ FeedToFetch.insert([topic])
+ urlfetch_test_stub.instance.expect('get', topic, 200, data)
+ self.handle('post', ('topic', topic))
+ feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
+ self.assertTrue(feed is None)
+
def testPullGoodAtom(self):
"""Tests when the Atom XML can parse just fine."""
data = ('<?xml version="1.0" encoding="utf-8"?>\n<feed><my
header="data"/>'