Revision: 353
Author: bslatkin
Date: Thu Apr 29 17:01:08 2010
Log: hub task insertion transactional
http://code.google.com/p/pubsubhubbub/source/detail?r=353
Modified:
/trunk/hub/main.py
/trunk/hub/main_test.py
=======================================
--- /trunk/hub/main.py Mon Mar 15 23:00:57 2010
+++ /trunk/hub/main.py Thu Apr 29 17:01:08 2010
@@ -684,21 +684,6 @@
return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))
-def retry_datastore_op(func, retries=5):
- """Executes the given function and retries it on Datastore errors.
-
- Args:
- func: Callable to do the datastore operation.
- retries: How many times to retry the operation.
- """
- for i in xrange(retries):
- try:
- return func()
- except db.Error:
- logging.exception('Error running Datastore operation, attempt %d',
i+1)
- if i == (retries-1):
- raise
-
################################################################################
# Models
@@ -811,9 +796,9 @@
"""Records that a callback URL needs verification before being
subscribed.
Creates a new subscription request (for asynchronous verification) if
None
- already exists. Any existing subscription request will not be modified;
+ already exists. Any existing subscription request will be overridden;
for instance, if a subscription has already been verified, this method
- will do nothing.
+ will cause it to be reconfirmed.
Args:
callback: URL that will receive callbacks.
@@ -852,13 +837,12 @@
now() + datetime.timedelta(seconds=lease_seconds)))
sub.confirm_failures = 0
sub.put()
- return (sub_is_new, sub)
- new, sub = db.run_in_transaction(txn)
- # Note: This enqueuing must come *after* the transaction is submitted,
or
- # else we'll actually run the task *before* the transaction is
submitted.
- sub.enqueue_task(cls.STATE_VERIFIED, verify_token, secret=secret,
- auto_reconfirm=auto_reconfirm)
- return new
+ sub.enqueue_task(cls.STATE_VERIFIED,
+ verify_token,
+ secret=secret,
+ auto_reconfirm=auto_reconfirm)
+ return sub_is_new
+ return db.run_in_transaction(txn)
@classmethod
def remove(cls, callback, topic):
@@ -906,14 +890,11 @@
if sub is not None:
sub.confirm_failures = 0
sub.put()
- return (True, sub)
- return (False, sub)
- removed, sub = db.run_in_transaction(txn)
- # Note: This enqueuing must come *after* the transaction is submitted,
or
- # else we'll actually run the task *before* the transaction is
submitted.
- if sub:
- sub.enqueue_task(cls.STATE_TO_DELETE, verify_token)
- return removed
+ sub.enqueue_task(cls.STATE_TO_DELETE, verify_token)
+ return True
+ else:
+ return False
+ return db.run_in_transaction(txn)
@classmethod
def archive(cls, callback, topic):
@@ -990,7 +971,6 @@
The new secret to use for this subscription after successful
confirmation.
"""
- # TODO(bslatkin): Remove these retries when they're not needed in
userland.
RETRIES = 3
if os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
target_queue = POLLING_QUEUE
@@ -1006,7 +986,7 @@
'verify_token': verify_token,
'secret': secret or '',
'auto_reconfirm': str(auto_reconfirm)}
- ).add(target_queue)
+ ).add(target_queue, transactional=True)
except (taskqueue.Error, apiproxy_errors.Error):
logging.exception('Could not insert task to confirm '
'topic = %s, callback = %s',
@@ -1044,19 +1024,21 @@
True if this Subscription confirmation should be retried again.
Returns
False if we should give up and never try again.
"""
- if self.confirm_failures >= max_failures:
- logging.warning('Max subscription failures exceeded, giving up.')
- return False
- else:
- retry_delay = retry_period * (2 ** self.confirm_failures)
- self.eta = now() + datetime.timedelta(seconds=retry_delay)
- self.confirm_failures += 1
- retry_datastore_op(lambda: self.put())
- # TODO(bslatkin): Do this enqueuing transactionally.
- self.enqueue_task(next_state, verify_token,
+ def txn():
+ if self.confirm_failures >= max_failures:
+ logging.warning('Max subscription failures exceeded, giving up.')
+ return False
+ else:
+ retry_delay = retry_period * (2 ** self.confirm_failures)
+ self.eta = now() + datetime.timedelta(seconds=retry_delay)
+ self.confirm_failures += 1
+ self.put()
+ self.enqueue_task(next_state,
+ verify_token,
auto_reconfirm=auto_reconfirm,
secret=secret)
return True
+ return db.run_in_transaction(txn)
class FeedToFetch(db.Model):
@@ -1067,11 +1049,11 @@
"""
topic = db.TextProperty(required=True)
- eta = db.DateTimeProperty(auto_now_add=True)
- fetching_failures = db.IntegerProperty(default=0)
+ eta = db.DateTimeProperty(auto_now_add=True, indexed=False)
+ fetching_failures = db.IntegerProperty(default=0, indexed=False)
totally_failed = db.BooleanProperty(default=False)
- source_keys = db.StringListProperty()
- source_values = db.StringListProperty()
+ source_keys = db.StringListProperty(indexed=False)
+ source_values = db.StringListProperty(indexed=False)
# TODO(bslatkin): Add fetching failure reason (urlfetch, parsing, etc)
and
# surface it on the topic details page.
@@ -1113,11 +1095,8 @@
source_keys=list(source_keys),
source_values=list(source_values))
for topic in set(topic_list)]
- retry_datastore_op(lambda: db.put(feed_list))
- # TODO(bslatkin): Use a bulk interface or somehow merge combined
fetches
- # into a single task.
- for feed in feed_list:
- feed._enqueue_task()
+ db.put(feed_list)
+ FeedToFetch._enqueue_tasks(feed_list)
def fetch_failed(self,
max_failures=MAX_FEED_PULL_FAILURES,
@@ -1133,18 +1112,18 @@
retry_period: Initial period for doing exponential (base-2) backoff.
now: Returns the current time as a UTC datetime.
"""
- if self.fetching_failures >= max_failures:
- logging.warning('Max fetching failures exceeded, giving up.')
- self.totally_failed = True
- retry_datastore_op(lambda: self.put())
- else:
- retry_delay = retry_period * (2 ** self.fetching_failures)
- logging.warning('Fetching failed. Will retry in %s seconds',
retry_delay)
- self.eta = now() + datetime.timedelta(seconds=retry_delay)
- self.fetching_failures += 1
- retry_datastore_op(lambda: self.put())
- # TODO(bslatkin): Do this enqueuing transactionally.
- self._enqueue_task()
+ def txn():
+ if self.fetching_failures >= max_failures:
+ logging.warning('Max fetching failures exceeded, giving up.')
+ self.totally_failed = True
+ else:
+ retry_delay = retry_period * (2 ** self.fetching_failures)
+ logging.warning('Fetching failed. Will retry in %s seconds',
retry_delay)
+ self.eta = now() + datetime.timedelta(seconds=retry_delay)
+ self.fetching_failures += 1
+ FeedToFetch._enqueue_tasks([self])
+ self.put()
+ db.run_in_transaction(txn)
def done(self):
"""The feed fetch has completed successfully.
@@ -1166,30 +1145,41 @@
return False
return db.run_in_transaction(txn)
- def _enqueue_task(self):
- """Enqueues a task to fetch this feed."""
- # TODO(bslatkin): Remove these retries when they're not needed in
userland.
- RETRIES = 3
- if self.fetching_failures > 0:
- target_queue = FEED_RETRIES_QUEUE
- elif os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
- target_queue = POLLING_QUEUE
- else:
- target_queue = FEED_QUEUE
- for i in xrange(RETRIES):
- try:
- taskqueue.Task(
- url='/work/pull_feeds',
- eta=self.eta,
- params={'topic': self.topic}
- ).add(target_queue)
- except (taskqueue.Error, apiproxy_errors.Error):
- logging.exception('Could not insert task to fetch topic = %s',
- self.topic)
- if i == (RETRIES - 1):
- raise
+ def _get_task(self):
+ """Creates a task that will fetch this feed."""
+ return taskqueue.Task(
+ url='/work/pull_feeds',
+ eta=self.eta,
+ params={'topic': self.topic})
+
+ @staticmethod
+ def _enqueue_tasks(feed_list, transactional=False):
+ """Enqueues a task to fetch the given feeds."""
+ task_dict = {}
+ for feed in feed_list:
+ if feed.fetching_failures > 0:
+ target_queue = FEED_RETRIES_QUEUE
+ elif os.environ.get('HTTP_X_APPENGINE_QUEUENAME') == POLLING_QUEUE:
+ target_queue = POLLING_QUEUE
else:
- return
+ target_queue = FEED_QUEUE
+ task_dict.setdefault(target_queue, []).append(feed)
+
+ RETRIES = 3
+ for queue_name, feed_list in task_dict.iteritems():
+ for i in xrange(RETRIES):
+ try:
+ taskqueue.Queue(queue_name).add(
+ [f._get_task() for f in feed_list],
+ transactional=transactional)
+ except (taskqueue.Error, apiproxy_errors.Error):
+ logging.exception('Could not insert tasks on queue = %r '
+ 'to fetch topics = %r',
+ [f.topic for f in feed_list])
+ if i == (RETRIES - 1):
+ raise
+ else:
+ return
class FeedRecord(db.Model):
@@ -1507,7 +1497,7 @@
if not more_callbacks and not self.failed_callbacks:
logging.info('EventToDeliver complete: topic = %s, delivery_mode
= %s',
self.topic, self.delivery_mode)
- retry_datastore_op(lambda: self.delete())
+ self.delete()
return
elif not more_callbacks:
self.last_callback = ''
@@ -1528,14 +1518,14 @@
len(self.failed_callbacks), self.last_modified,
self.totally_failed)
- retry_datastore_op(lambda: self.put())
- if not self.totally_failed:
- # TODO(bslatkin): Do this enqueuing transactionally.
- self.enqueue()
+ def txn():
+ self.put()
+ if not self.totally_failed:
+ self.enqueue()
+ db.run_in_transaction(txn)
def enqueue(self):
"""Enqueues a Task that will execute this EventToDeliver."""
- # TODO(bslatkin): Remove these retries when they're not needed in
userland.
RETRIES = 3
if self.delivery_mode == EventToDeliver.RETRY:
target_queue = EVENT_RETRIES_QUEUE
@@ -1549,7 +1539,7 @@
url='/work/push_events',
eta=self.last_modified,
params={'event_key': self.key()}
- ).add(target_queue)
+ ).add(target_queue, transactional=True)
except (taskqueue.Error, apiproxy_errors.Error):
logging.exception('Could not insert task to deliver '
'events for topic = %s', self.topic)
@@ -2186,7 +2176,7 @@
def get(self):
self.response.out.write(template.render('publish_debug.html', {}))
- @dos.limit(count=100, period=1) # TODO(bslatkin): need whitelist
+ @dos.limit(count=100, period=1)
def post(self):
self.response.headers['Content-Type'] = 'text/plain'
@@ -2412,6 +2402,8 @@
all_entities.insert(0, group[len(group)/2:])
all_entities.insert(0, group[:len(group)/2])
raise
+ if event_to_deliver:
+ event_to_deliver.enqueue()
for i in xrange(PUT_SPLITTING_ATTEMPTS):
try:
@@ -2424,11 +2416,6 @@
'request size; dropping event for %s',
feed_record.topic)
return True
- # TODO(bslatkin): Make this transactional with the call to work.done()
- # that happens in the PullFeedHandler.post() method.
- if event_to_deliver:
- event_to_deliver.enqueue()
-
# Inform any hooks that there will is a new event to deliver that has
# been recorded and delivery has begun.
hooks.execute(inform_event, event_to_deliver)
@@ -2614,7 +2601,7 @@
continue
headers = {
- # TODO(bslatkin): Remove the 'or' here once migration is done.
+ # In case there was no content type header.
'Content-Type': work.content_type or 'text/xml',
# TODO(bslatkin): add a better test for verify_token here.
'X-Hub-Signature': 'sha1=%s' % sha1_hmac(
@@ -2751,7 +2738,7 @@
current_key = None
################################################################################
-# feed canonicalization
+# Feed canonicalization
class RecordFeedHandler(webapp.RequestHandler):
"""Background worker for categorizing/classifying feed URLs by their
ID."""
@@ -2786,14 +2773,14 @@
response = urlfetch.fetch(topic)
except (apiproxy_errors.Error, urlfetch.Error):
logging.exception('Could not fetch topic = %s for feed ID', topic)
- retry_datastore_op(lambda: known_feed.put())
+ known_feed.put()
return
# TODO(bslatkin): Add more intelligent retrying of feed identification.
if response.status_code != 200:
logging.warning('Fetching topic = %s for feed ID returned
response %s',
topic, response.status_code)
- retry_datastore_op(lambda: known_feed.put())
+ known_feed.put()
return
order = (ATOM, RSS)
@@ -2816,7 +2803,7 @@
if parse_failures == len(order) or feed_id is None:
logging.warning('Could not record feed ID for topic = %s:\n%s',
topic, error_traceback)
- retry_datastore_op(lambda: known_feed.put())
+ known_feed.put()
# Just give up, since we can't parse it. This case also covers when
# the character encoding for the document is unsupported.
return
@@ -2831,7 +2818,7 @@
KnownFeedIdentity.update(feed_id, topic)
known_feed.feed_id = feed_id
- retry_datastore_op(lambda: known_feed.put())
+ known_feed.put()
################################################################################
=======================================
--- /trunk/hub/main_test.py Fri Feb 26 11:54:31 2010
+++ /trunk/hub/main_test.py Thu Apr 29 17:01:08 2010
@@ -109,17 +109,6 @@
u'/07256788297315478906/label/\u30d6\u30ed\u30b0\u8846')
self.assertEquals(good_iri, main.normalize_iri(iri))
- def testRetryDatastoreOp(self):
- """Tests the retry_datastore_op function."""
- tries = [0]
- def my_func():
- tries[0] += 1
- raise db.Error('Doh')
- self.assertRaises(db.Error,
- main.retry_datastore_op,
- my_func)
- self.assertEquals(5, tries[0])
-
################################################################################
class TestWorkQueueHandler(webapp.RequestHandler):
@@ -1130,11 +1119,14 @@
def testQueuePreserved(self):
"""Tests that enqueueing an EventToDeliver preserves the polling
queue."""
event, work_key, sub_list, sub_keys = self.insert_subscriptions()
- event.enqueue()
+ def txn():
+ event.enqueue()
+ db.run_in_transaction(txn)
+
testutil.get_tasks(main.EVENT_QUEUE, expected_count=1)
os.environ['HTTP_X_APPENGINE_QUEUENAME'] = main.POLLING_QUEUE
try:
- event.enqueue()
+ db.run_in_transaction(txn)
finally:
del os.environ['HTTP_X_APPENGINE_QUEUENAME']