Revision: 408
Author:   bslatkin
Date:     Tue Jul 26 23:41:24 2011
Log: Incorporates various scalability tweaks; removes periodic EventToDeliver cleanup cronjob
http://code.google.com/p/pubsubhubbub/source/detail?r=408

Modified:
 /trunk/hub/cron.yaml
 /trunk/hub/index.yaml
 /trunk/hub/main.py
 /trunk/hub/main_test.py

=======================================
--- /trunk/hub/cron.yaml        Mon Oct 26 09:47:02 2009
+++ /trunk/hub/cron.yaml        Tue Jul 26 23:41:24 2011
@@ -3,10 +3,6 @@
   url: /work/poll_bootstrap
   schedule: every 5 minutes

-- description: EventToDeliver cleanup
-  url: /work/event_cleanup
-  schedule: every 1 minutes
-
 - description: Subscription cleanup
   url: /work/subscription_cleanup
   schedule: every 1 minutes
=======================================
--- /trunk/hub/index.yaml       Thu Sep 23 11:15:26 2010
+++ /trunk/hub/index.yaml       Tue Jul 26 23:41:24 2011
@@ -7,13 +7,6 @@
   - name: topic_hash
   - name: callback_hash

-# For displaying failed callbacks for a specific subscription.
-- kind: EventToDeliver
-  properties:
-  - name: failed_callbacks
-  - name: last_modified
-    direction: desc
-

 # AUTOGENERATED

=======================================
--- /trunk/hub/main.py  Tue Jul 26 09:40:18 2011
+++ /trunk/hub/main.py  Tue Jul 26 23:41:24 2011
@@ -152,7 +152,7 @@
 DELIVERY_RETRY_PERIOD = 30 # seconds

 # Period at which feed IDs should be refreshed.
-FEED_IDENTITY_UPDATE_PERIOD = (10 * 24 * 60 * 60) # 10 days
+FEED_IDENTITY_UPDATE_PERIOD = (20 * 24 * 60 * 60) # 20 days

 # Number of polling feeds to fetch from the Datastore at a time.
 BOOSTRAP_FEED_CHUNK_SIZE = 50
@@ -209,14 +209,14 @@
 FETCH_SCORER = dos.UrlScorer(
   period=300,  # Seconds
   min_requests=5,  # per second
- max_failure_percentage=1, # TODO: Drop this to something more reasonable!
+  max_failure_percentage=0.8,
   prefix='pull_feed')

 # Pushing events
 DELIVERY_SCORER = dos.UrlScorer(
   period=300,  # Seconds
   min_requests=0.5,  # per second
- max_failure_percentage=1, # TODO: Drop this to something more reasonable!
+  max_failure_percentage=0.8,
   prefix='deliver_events')


@@ -689,7 +689,6 @@
   """Returns a string containing a random challenge token."""
   return ''.join(random.choice(_VALID_CHARS) for i in xrange(128))

-
################################################################################
 # Models

@@ -929,7 +928,7 @@
     Returns:
       True if it has verified subscribers, False otherwise.
     """
-    if (cls.all().filter('topic_hash =', sha1_hash(topic))
+    if (cls.all(keys_only=True).filter('topic_hash =', sha1_hash(topic))
.filter('subscription_state =', cls.STATE_VERIFIED).get() is not None):
       return True
     else:
@@ -1047,7 +1046,7 @@
     return db.run_in_transaction(txn)


-class FeedToFetch(db.Model):
+class FeedToFetch(db.Expando):
   """A feed that has new data that needs to be pulled.

The key name of this entity is a get_hash_key_name() hash of the topic URL, so
@@ -1057,7 +1056,7 @@
   topic = db.TextProperty(required=True)
   eta = db.DateTimeProperty(auto_now_add=True, indexed=False)
   fetching_failures = db.IntegerProperty(default=0, indexed=False)
-  totally_failed = db.BooleanProperty(default=False)
+  totally_failed = db.BooleanProperty(default=False, indexed=False)
   source_keys = db.StringListProperty(indexed=False)
   source_values = db.StringListProperty(indexed=False)
   work_index = db.IntegerProperty()
@@ -1145,19 +1144,24 @@
       retry_period: Initial period for doing exponential (base-2) backoff.
       now: Returns the current time as a UTC datetime.
     """
+    orig_failures = self.fetching_failures
     def txn():
       if self.fetching_failures >= max_failures:
         logging.debug('Max fetching failures exceeded, giving up.')
         self.totally_failed = True
       else:
-        retry_delay = retry_period * (2 ** self.fetching_failures)
+        retry_delay = retry_period * (2 ** orig_failures)
         logging.debug('Fetching failed. Will retry in %s seconds',
                       retry_delay)
         self.eta = now() + datetime.timedelta(seconds=retry_delay)
-        self.fetching_failures += 1
+        self.fetching_failures = orig_failures + 1
         self._enqueue_retry_task()
       self.put()
-    db.run_in_transaction(txn)
+    try:
+      db.run_in_transaction_custom_retries(2, txn)
+    except:
+ logging.exception('Could not mark feed fetching as a failure: topic=%r',
+                        self.topic)

   def done(self):
     """The feed fetch has completed successfully.
@@ -1230,11 +1234,11 @@
   """

   topic = db.TextProperty(required=True)
-  header_footer = db.TextProperty()  # Save this for debugging.
- last_updated = db.DateTimeProperty(auto_now=True) # The last polling time.
+  header_footer = db.TextProperty()
+  last_updated = db.DateTimeProperty(auto_now=True, indexed=False)
   format = db.TextProperty()  # 'atom', 'rss', or 'arbitrary'

-  # Content-related headers served by the feed' host.
+  # Content-related headers served by the feed's host.
   content_type = db.TextProperty()
   last_modified = db.TextProperty()
   etag = db.TextProperty()
@@ -1331,6 +1335,7 @@
     headers = {
       'Cache-Control': 'no-cache no-store max-age=1',
       'Connection': 'cache-control',
+      'Accept': '*/*',
     }
     if self.last_modified:
       headers['If-Modified-Since'] = self.last_modified
@@ -1437,10 +1442,11 @@
   topic_hash = db.StringProperty(required=True)
   last_callback = db.TextProperty(default='')  # For paging Subscriptions
failed_callbacks = db.ListProperty(db.Key) # Refs to Subscription entities
-  delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES)
+  delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES,
+                                    indexed=False)
   retry_attempts = db.IntegerProperty(default=0, indexed=False)
-  last_modified = db.DateTimeProperty(required=True)
-  totally_failed = db.BooleanProperty(default=False)
+  last_modified = db.DateTimeProperty(required=True, indexed=False)
+  totally_failed = db.BooleanProperty(default=False, indexed=False)
   content_type = db.TextProperty(default='')
   max_failures = db.IntegerProperty(indexed=False)

@@ -1619,13 +1625,17 @@
       return
     elif not more_callbacks:
       self.last_callback = ''
-      retry_delay = retry_period * (2 ** self.retry_attempts)
-      self.last_modified += datetime.timedelta(seconds=retry_delay)
       self.retry_attempts += 1
       if self.max_failures is not None:
         max_failures = self.max_failures
       if self.retry_attempts > max_failures:
         self.totally_failed = True
+      else:
+        retry_delay = retry_period * (2 ** (self.retry_attempts-1))
+        try:
+          self.last_modified += datetime.timedelta(seconds=retry_delay)
+        except OverflowError:
+          pass

       if self.delivery_mode == EventToDeliver.NORMAL:
         logging.debug('Normal delivery done; %d broken callbacks remain',
@@ -1969,7 +1979,12 @@
         if topic_set is None:
           topic_set = set([known_topic])
           output_dict[known_topic] = topic_set
-        topic_set.update(identified.topics)
+        # TODO(bslatkin): Test this.
+        if len(identified.topics) > 25:
+          logging.debug('Too many expansion feeds for topic %s: %s',
+                        known_topic, identified.topics)
+        else:
+          topic_set.update(identified.topics)

     return output_dict

@@ -2235,6 +2250,7 @@
           done_callback_queue=POLLING_QUEUE))


+# TODO(bslatkin): Move this to an offline job.
 class SubscriptionCleanupHandler(webapp.RequestHandler):
   """Background worker for cleaning up deleted Subscription instances."""

@@ -2367,7 +2383,8 @@
       self.response.out.write('MUST supply at least one hub.url parameter')
       return

-    logging.debug('Publish event for %d URLs: %s', len(urls), urls)
+    logging.debug('Publish event for %d URLs (showing first 25): %s',
+                  len(urls), list(urls)[:25])
     error = self.receive_publish(urls, 204, 'hub.url')
     if error:
       self.response.out.write(error)
@@ -2558,8 +2575,9 @@
     except (xml.sax.SAXException, feed_diff.Error), e:
       error_traceback = traceback.format_exc()
       logging.debug(
- 'Could not get entries for content of %d bytes in format "%s":\n%s',
-          len(content), format, error_traceback)
+          'Could not get entries for content of %d bytes in format "%s" '
+          'for topic %r:\n%s',
+          len(content), format, feed_record.topic, error_traceback)
       parse_failures += 1
     except LookupError, e:
       error_traceback = traceback.format_exc()
@@ -2570,7 +2588,8 @@
       return true_on_bad_feed

   if parse_failures == len(order):
-    logging.error('Could not parse feed; giving up:\n%s', error_traceback)
+    logging.error('Could not parse feed %r; giving up:\n%s',
+                  feed_record.topic, error_traceback)
     # That's right, we return True. This will cause the fetch to be
     # abandoned on parse failures because the feed is beyond hope!
     return true_on_bad_feed
@@ -2580,7 +2599,7 @@
# separate EventToDeliver entities to be inserted for the feed pulls, each
   # containing a separate subset of the data.
   if len(entities_to_save) > MAX_NEW_FEED_ENTRY_RECORDS:
- logging.warning('Found more entities than we can process for topic %s; ' + logging.warning('Found more entities than we can process for topic %r; '
                     'splitting', feed_record.topic)
     entities_to_save = entities_to_save[:MAX_NEW_FEED_ENTRY_RECORDS]
     entry_payloads = entry_payloads[:MAX_NEW_FEED_ENTRY_RECORDS]
@@ -2625,8 +2644,8 @@
       try:
         db.put(group)
       except (db.BadRequestError, apiproxy_errors.RequestTooLargeError):
- logging.exception('Could not insert %d entities; splitting in half',
-                          len(group))
+        logging.exception('Could not insert %d entities for topic %r; '
+ 'splitting in half', len(group), feed_record.topic) # Insert the first half at the beginning since we need to make sure that
         # the EventToDeliver gets inserted first.
         all_entities.insert(0, group[len(group)/2:])
@@ -2904,30 +2923,6 @@

     work.update(more_subscribers, failed_callbacks)

-
-class EventCleanupHandler(webapp.RequestHandler):
-  """Background worker for cleaning up expired EventToDeliver instances."""
-
-  def __init__(self, now=datetime.datetime.utcnow):
-    """Initializer."""
-    webapp.RequestHandler.__init__(self)
-    self.now = now
-
-  @work_queue_only
-  def get(self):
-    threshold = (self.now() -
-        datetime.timedelta(seconds=EVENT_CLEANUP_MAX_AGE_SECONDS))
-    events = (EventToDeliver.all()
-              .filter('last_modified <=', threshold)
-              .order('last_modified').fetch(EVENT_CLEANUP_CHUNK_SIZE))
-    if events:
-      logging.info('Cleaning up %d events older than %s',
-                   len(events), threshold)
-      try:
-        db.delete(events)
- except (db.Error, apiproxy_errors.Error, runtime.DeadlineExceededError):
-        logging.exception('Could not clean-up EventToDeliver instances')
-
################################################################################

 def take_polling_action(topic_list, poll_type):
@@ -3217,7 +3212,6 @@
     else:
       failed_events = (EventToDeliver.all()
         .filter('failed_callbacks =', subscription.key())
-        .order('-last_modified')
         .fetch(25))
       delivery_score = DELIVERY_SCORER.filter([callback_url])[0]

=======================================
--- /trunk/hub/main_test.py     Tue Jul 26 09:40:18 2011
+++ /trunk/hub/main_test.py     Tue Jul 26 23:41:24 2011
@@ -2224,7 +2224,8 @@
self.assertEquals(data.replace('\n', ''), event.payload.replace('\n', ''))
     self.assertEquals('application/atom+xml', event.content_type)
     self.assertEquals(
-        {'Connection': 'cache-control',
+        {'Accept': '*/*',
+         'Connection': 'cache-control',
          'Cache-Control': 'no-cache no-store max-age=1'},
         FeedRecord.all().get().get_request_headers(0))

@@ -2787,49 +2788,6 @@
     finally:
       dos.DISABLE_FOR_TESTING = True

-
-class EventCleanupHandlerTest(testutil.HandlerTestBase):
-  """Tests for the EventCleanupHandler worker."""
-
-  def setUp(self):
-    """Sets up the test harness."""
-    self.now = datetime.datetime.utcnow()
-    self.expire_time = self.now - datetime.timedelta(
-        seconds=main.EVENT_CLEANUP_MAX_AGE_SECONDS)
-    def create_handler():
-      return main.EventCleanupHandler(now=lambda: self.now)
-    self.handler_class = create_handler
-    testutil.HandlerTestBase.setUp(self)
-    self.topic = 'http://example.com/mytopic'
-    self.header_footer = '<feed></feed>'
-
-  def testEventCleanupTooYoung(self):
- """Tests when there are events present, but they're too young to remove."""
-    event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, 'application/atom+xml',
-        self.header_footer, [])
-    event.last_modified = self.expire_time + datetime.timedelta(seconds=1)
-    event.put()
-    self.handle('get')
-    self.assertTrue(db.get(event.key()) is not None)
-
-  def testEventCleanupOldEnough(self):
-    """Tests when there are events old enough to clean up."""
-    event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, 'application/atom+xml',
-        self.header_footer, [])
-    event.last_modified = self.expire_time
-    event.put()
-
-    too_young_event = EventToDeliver.create_event_for_topic(
-        self.topic + 'blah', main.ATOM, 'application/atom+xml',
-        self.header_footer, [])
-    too_young_event.put()
-
-    self.handle('get')
-    self.assertTrue(db.get(event.key()) is None)
-    self.assertTrue(db.get(too_young_event.key()) is not None)
-
################################################################################

 class SubscribeHandlerTest(testutil.HandlerTestBase):

Reply via email to