Revision: 408
Author: bslatkin
Date: Tue Jul 26 23:41:24 2011
Log: Incorporates various scalability tweaks; removes periodic
EventToDeliver cleanup cronjob
http://code.google.com/p/pubsubhubbub/source/detail?r=408
Modified:
/trunk/hub/cron.yaml
/trunk/hub/index.yaml
/trunk/hub/main.py
/trunk/hub/main_test.py
=======================================
--- /trunk/hub/cron.yaml Mon Oct 26 09:47:02 2009
+++ /trunk/hub/cron.yaml Tue Jul 26 23:41:24 2011
@@ -3,10 +3,6 @@
url: /work/poll_bootstrap
schedule: every 5 minutes
-- description: EventToDeliver cleanup
- url: /work/event_cleanup
- schedule: every 1 minutes
-
- description: Subscription cleanup
url: /work/subscription_cleanup
schedule: every 1 minutes
=======================================
--- /trunk/hub/index.yaml Thu Sep 23 11:15:26 2010
+++ /trunk/hub/index.yaml Tue Jul 26 23:41:24 2011
@@ -7,13 +7,6 @@
- name: topic_hash
- name: callback_hash
-# For displaying failed callbacks for a specific subscription.
-- kind: EventToDeliver
- properties:
- - name: failed_callbacks
- - name: last_modified
- direction: desc
-
# AUTOGENERATED
=======================================
--- /trunk/hub/main.py Tue Jul 26 09:40:18 2011
+++ /trunk/hub/main.py Tue Jul 26 23:41:24 2011
@@ -152,7 +152,7 @@
DELIVERY_RETRY_PERIOD = 30 # seconds
# Period at which feed IDs should be refreshed.
-FEED_IDENTITY_UPDATE_PERIOD = (10 * 24 * 60 * 60) # 10 days
+FEED_IDENTITY_UPDATE_PERIOD = (20 * 24 * 60 * 60) # 20 days
# Number of polling feeds to fetch from the Datastore at a time.
BOOSTRAP_FEED_CHUNK_SIZE = 50
@@ -209,14 +209,14 @@
FETCH_SCORER = dos.UrlScorer(
period=300, # Seconds
min_requests=5, # per second
- max_failure_percentage=1, # TODO: Drop this to something more
reasonable!
+ max_failure_percentage=0.8,
prefix='pull_feed')
# Pushing events
DELIVERY_SCORER = dos.UrlScorer(
period=300, # Seconds
min_requests=0.5, # per second
- max_failure_percentage=1, # TODO: Drop this to something more
reasonable!
+ max_failure_percentage=0.8,
prefix='deliver_events')
@@ -689,7 +689,6 @@
"""Returns a string containing a random challenge token."""
return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))
-
################################################################################
# Models
@@ -929,7 +928,7 @@
Returns:
True if it has verified subscribers, False otherwise.
"""
- if (cls.all().filter('topic_hash =', sha1_hash(topic))
+ if (cls.all(keys_only=True).filter('topic_hash =', sha1_hash(topic))
.filter('subscription_state =', cls.STATE_VERIFIED).get() is not
None):
return True
else:
@@ -1047,7 +1046,7 @@
return db.run_in_transaction(txn)
-class FeedToFetch(db.Model):
+class FeedToFetch(db.Expando):
"""A feed that has new data that needs to be pulled.
The key name of this entity is a get_hash_key_name() hash of the topic
URL, so
@@ -1057,7 +1056,7 @@
topic = db.TextProperty(required=True)
eta = db.DateTimeProperty(auto_now_add=True, indexed=False)
fetching_failures = db.IntegerProperty(default=0, indexed=False)
- totally_failed = db.BooleanProperty(default=False)
+ totally_failed = db.BooleanProperty(default=False, indexed=False)
source_keys = db.StringListProperty(indexed=False)
source_values = db.StringListProperty(indexed=False)
work_index = db.IntegerProperty()
@@ -1145,19 +1144,24 @@
retry_period: Initial period for doing exponential (base-2) backoff.
now: Returns the current time as a UTC datetime.
"""
+ orig_failures = self.fetching_failures
def txn():
if self.fetching_failures >= max_failures:
logging.debug('Max fetching failures exceeded, giving up.')
self.totally_failed = True
else:
- retry_delay = retry_period * (2 ** self.fetching_failures)
+ retry_delay = retry_period * (2 ** orig_failures)
logging.debug('Fetching failed. Will retry in %s seconds',
retry_delay)
self.eta = now() + datetime.timedelta(seconds=retry_delay)
- self.fetching_failures += 1
+ self.fetching_failures = orig_failures + 1
self._enqueue_retry_task()
self.put()
- db.run_in_transaction(txn)
+ try:
+ db.run_in_transaction_custom_retries(2, txn)
+ except:
+ logging.exception('Could not mark feed fetching as a failure:
topic=%r',
+ self.topic)
def done(self):
"""The feed fetch has completed successfully.
@@ -1230,11 +1234,11 @@
"""
topic = db.TextProperty(required=True)
- header_footer = db.TextProperty() # Save this for debugging.
- last_updated = db.DateTimeProperty(auto_now=True) # The last polling
time.
+ header_footer = db.TextProperty()
+ last_updated = db.DateTimeProperty(auto_now=True, indexed=False)
format = db.TextProperty() # 'atom', 'rss', or 'arbitrary'
- # Content-related headers served by the feed' host.
+ # Content-related headers served by the feed's host.
content_type = db.TextProperty()
last_modified = db.TextProperty()
etag = db.TextProperty()
@@ -1331,6 +1335,7 @@
headers = {
'Cache-Control': 'no-cache no-store max-age=1',
'Connection': 'cache-control',
+ 'Accept': '*/*',
}
if self.last_modified:
headers['If-Modified-Since'] = self.last_modified
@@ -1437,10 +1442,11 @@
topic_hash = db.StringProperty(required=True)
last_callback = db.TextProperty(default='') # For paging Subscriptions
failed_callbacks = db.ListProperty(db.Key) # Refs to Subscription
entities
- delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES)
+ delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES,
+ indexed=False)
retry_attempts = db.IntegerProperty(default=0, indexed=False)
- last_modified = db.DateTimeProperty(required=True)
- totally_failed = db.BooleanProperty(default=False)
+ last_modified = db.DateTimeProperty(required=True, indexed=False)
+ totally_failed = db.BooleanProperty(default=False, indexed=False)
content_type = db.TextProperty(default='')
max_failures = db.IntegerProperty(indexed=False)
@@ -1619,13 +1625,17 @@
return
elif not more_callbacks:
self.last_callback = ''
- retry_delay = retry_period * (2 ** self.retry_attempts)
- self.last_modified += datetime.timedelta(seconds=retry_delay)
self.retry_attempts += 1
if self.max_failures is not None:
max_failures = self.max_failures
if self.retry_attempts > max_failures:
self.totally_failed = True
+ else:
+ retry_delay = retry_period * (2 ** (self.retry_attempts-1))
+ try:
+ self.last_modified += datetime.timedelta(seconds=retry_delay)
+ except OverflowError:
+ pass
if self.delivery_mode == EventToDeliver.NORMAL:
logging.debug('Normal delivery done; %d broken callbacks remain',
@@ -1969,7 +1979,12 @@
if topic_set is None:
topic_set = set([known_topic])
output_dict[known_topic] = topic_set
- topic_set.update(identified.topics)
+ # TODO(bslatkin): Test this.
+ if len(identified.topics) > 25:
+ logging.debug('Too many expansion feeds for topic %s: %s',
+ known_topic, identified.topics)
+ else:
+ topic_set.update(identified.topics)
return output_dict
@@ -2235,6 +2250,7 @@
done_callback_queue=POLLING_QUEUE))
+# TODO(bslatkin): Move this to an offline job.
class SubscriptionCleanupHandler(webapp.RequestHandler):
"""Background worker for cleaning up deleted Subscription instances."""
@@ -2367,7 +2383,8 @@
self.response.out.write('MUST supply at least one hub.url parameter')
return
- logging.debug('Publish event for %d URLs: %s', len(urls), urls)
+ logging.debug('Publish event for %d URLs (showing first 25): %s',
+ len(urls), list(urls)[:25])
error = self.receive_publish(urls, 204, 'hub.url')
if error:
self.response.out.write(error)
@@ -2558,8 +2575,9 @@
except (xml.sax.SAXException, feed_diff.Error), e:
error_traceback = traceback.format_exc()
logging.debug(
- 'Could not get entries for content of %d bytes in
format "%s":\n%s',
- len(content), format, error_traceback)
+ 'Could not get entries for content of %d bytes in format "%s" '
+ 'for topic %r:\n%s',
+ len(content), format, feed_record.topic, error_traceback)
parse_failures += 1
except LookupError, e:
error_traceback = traceback.format_exc()
@@ -2570,7 +2588,8 @@
return true_on_bad_feed
if parse_failures == len(order):
- logging.error('Could not parse feed; giving up:\n%s', error_traceback)
+ logging.error('Could not parse feed %r; giving up:\n%s',
+ feed_record.topic, error_traceback)
# That's right, we return True. This will cause the fetch to be
# abandoned on parse failures because the feed is beyond hope!
return true_on_bad_feed
@@ -2580,7 +2599,7 @@
# separate EventToDeliver entities to be inserted for the feed pulls,
each
# containing a separate subset of the data.
if len(entities_to_save) > MAX_NEW_FEED_ENTRY_RECORDS:
- logging.warning('Found more entities than we can process for
topic %s; '
+ logging.warning('Found more entities than we can process for
topic %r; '
'splitting', feed_record.topic)
entities_to_save = entities_to_save[:MAX_NEW_FEED_ENTRY_RECORDS]
entry_payloads = entry_payloads[:MAX_NEW_FEED_ENTRY_RECORDS]
@@ -2625,8 +2644,8 @@
try:
db.put(group)
except (db.BadRequestError, apiproxy_errors.RequestTooLargeError):
- logging.exception('Could not insert %d entities; splitting in
half',
- len(group))
+ logging.exception('Could not insert %d entities for topic %r; '
+ 'splitting in half', len(group),
feed_record.topic)
# Insert the first half at the beginning since we need to make
sure that
# the EventToDeliver gets inserted first.
all_entities.insert(0, group[len(group)/2:])
@@ -2904,30 +2923,6 @@
work.update(more_subscribers, failed_callbacks)
-
-class EventCleanupHandler(webapp.RequestHandler):
- """Background worker for cleaning up expired EventToDeliver instances."""
-
- def __init__(self, now=datetime.datetime.utcnow):
- """Initializer."""
- webapp.RequestHandler.__init__(self)
- self.now = now
-
- @work_queue_only
- def get(self):
- threshold = (self.now() -
- datetime.timedelta(seconds=EVENT_CLEANUP_MAX_AGE_SECONDS))
- events = (EventToDeliver.all()
- .filter('last_modified <=', threshold)
- .order('last_modified').fetch(EVENT_CLEANUP_CHUNK_SIZE))
- if events:
- logging.info('Cleaning up %d events older than %s',
- len(events), threshold)
- try:
- db.delete(events)
- except (db.Error, apiproxy_errors.Error,
runtime.DeadlineExceededError):
- logging.exception('Could not clean-up EventToDeliver instances')
-
################################################################################
def take_polling_action(topic_list, poll_type):
@@ -3217,7 +3212,6 @@
else:
failed_events = (EventToDeliver.all()
.filter('failed_callbacks =', subscription.key())
- .order('-last_modified')
.fetch(25))
delivery_score = DELIVERY_SCORER.filter([callback_url])[0]
=======================================
--- /trunk/hub/main_test.py Tue Jul 26 09:40:18 2011
+++ /trunk/hub/main_test.py Tue Jul 26 23:41:24 2011
@@ -2224,7 +2224,8 @@
self.assertEquals(data.replace('\n', ''),
event.payload.replace('\n', ''))
self.assertEquals('application/atom+xml', event.content_type)
self.assertEquals(
- {'Connection': 'cache-control',
+ {'Accept': '*/*',
+ 'Connection': 'cache-control',
'Cache-Control': 'no-cache no-store max-age=1'},
FeedRecord.all().get().get_request_headers(0))
@@ -2787,49 +2788,6 @@
finally:
dos.DISABLE_FOR_TESTING = True
-
-class EventCleanupHandlerTest(testutil.HandlerTestBase):
- """Tests for the EventCleanupHandler worker."""
-
- def setUp(self):
- """Sets up the test harness."""
- self.now = datetime.datetime.utcnow()
- self.expire_time = self.now - datetime.timedelta(
- seconds=main.EVENT_CLEANUP_MAX_AGE_SECONDS)
- def create_handler():
- return main.EventCleanupHandler(now=lambda: self.now)
- self.handler_class = create_handler
- testutil.HandlerTestBase.setUp(self)
- self.topic = 'http://example.com/mytopic'
- self.header_footer = '<feed></feed>'
-
- def testEventCleanupTooYoung(self):
- """Tests when there are events present, but they're too young to
remove."""
- event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, 'application/atom+xml',
- self.header_footer, [])
- event.last_modified = self.expire_time + datetime.timedelta(seconds=1)
- event.put()
- self.handle('get')
- self.assertTrue(db.get(event.key()) is not None)
-
- def testEventCleanupOldEnough(self):
- """Tests when there are events old enough to clean up."""
- event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, 'application/atom+xml',
- self.header_footer, [])
- event.last_modified = self.expire_time
- event.put()
-
- too_young_event = EventToDeliver.create_event_for_topic(
- self.topic + 'blah', main.ATOM, 'application/atom+xml',
- self.header_footer, [])
- too_young_event.put()
-
- self.handle('get')
- self.assertTrue(db.get(event.key()) is None)
- self.assertTrue(db.get(too_young_event.key()) is not None)
-
################################################################################
class SubscribeHandlerTest(testutil.HandlerTestBase):