Revision: 392
Author: bslatkin
Date: Fri Nov  5 13:41:59 2010
Log: Hub: Adds support for arbitrary content payloads that do not parse as rss/atom; still no security implementation available for http header values
http://code.google.com/p/pubsubhubbub/source/detail?r=392

Modified:
 /trunk/hub/main.py
 /trunk/hub/main_test.py

=======================================
--- /trunk/hub/main.py  Wed Sep 22 15:16:49 2010
+++ /trunk/hub/main.py  Fri Nov  5 13:41:59 2010
@@ -553,6 +553,7 @@

 ATOM = 'atom'
 RSS = 'rss'
+ARBITRARY = 'arbitrary'

 VALID_PORTS = frozenset([
     '80', '443', '4443', '8080', '8081', '8082', '8083', '8084', '8085',
@@ -1229,7 +1230,7 @@
   topic = db.TextProperty(required=True)
   header_footer = db.TextProperty()  # Save this for debugging.
last_updated = db.DateTimeProperty(auto_now=True) # The last polling time.
-  format = db.TextProperty()  # 'atom' or 'rss'
+  format = db.TextProperty()  # 'atom', 'rss', or 'arbitrary'

   # Content-related headers served by the feed' host.
   content_type = db.TextProperty()
@@ -1291,10 +1292,10 @@
     Args:
headers: Dictionary of response headers from the feed that should be used
         to determine how to poll the feed in the future.
- header_footer: Contents of the feed's XML document minus the entry data;
-        if not supplied, the old value will remain.
+ header_footer: Contents of the feed's XML document minus the entry data.
+        if not supplied, the old value will remain. Only saved for feeds.
       format: The last parsing format that worked correctly for this feed.
-        Should be 'rss' or 'atom'.
+        Should be 'rss', 'atom', or 'arbitrary'.
     """
     try:
       self.content_type = headers.get('Content-Type', '').lower()
@@ -1311,10 +1312,10 @@
     except UnicodeDecodeError:
       logging.exception('ETag header had bad encoding')

-    if header_footer is not None:
-      self.header_footer = header_footer
     if format is not None:
       self.format = format
+    if header_footer is not None and self.format != ARBITRARY:
+      self.header_footer = header_footer

   def get_request_headers(self):
     """Returns the request headers that should be used to pull this feed.
@@ -1439,6 +1440,7 @@
   def create_event_for_topic(cls,
                              topic,
                              format,
+                             content_type,
                              header_footer,
                              entry_payloads,
                              now=datetime.datetime.utcnow,
@@ -1448,9 +1450,12 @@

     Args:
       topic: The topic that had the event.
-      format: Format of the feed, either 'atom' or 'rss'.
+      format: Format of the feed, 'atom', 'rss', or 'arbitrary'.
+      content_type: The original content type of the feed, fetched from the
+        server, if any. May be empty.
       header_footer: The header and footer of the published feed into which
-        the entry list will be spliced.
+        the entry list will be spliced. For arbitrary content this is the
+        full body of the resource.
       entry_payloads: List of strings containing entry payloads (i.e., all
XML data for each entry, including surrounding tags) in order of newest
         to oldest.
@@ -1465,27 +1470,32 @@
     Returns:
       A new EventToDeliver instance that has not been stored.
     """
-    close_index = header_footer.rfind('</')
-    assert close_index != -1, 'Could not find "</" in feed envelope'
-    end_tag = header_footer[close_index:]
-    if 'rss' in end_tag:
-      # RSS needs special handling, since it actually closes with
-      # a combination of </channel></rss> we need to traverse one
-      # level higher.
-      close_index = header_footer[:close_index].rfind('</')
- assert close_index != -1, 'Could not find "</channel>" in feed envelope'
+    if format in (ATOM, RSS):
+      # This is feed XML.
+      close_index = header_footer.rfind('</')
+      assert close_index != -1, 'Could not find "</" in feed envelope'
       end_tag = header_footer[close_index:]
-      content_type = 'application/rss+xml'
-    elif 'feed' in end_tag:
-      content_type = 'application/atom+xml'
-    elif 'rdf' in end_tag:
-      content_type = 'application/rdf+xml'
-
-    payload_list = ['<?xml version="1.0" encoding="utf-8"?>',
-                    header_footer[:close_index]]
-    payload_list.extend(entry_payloads)
-    payload_list.append(header_footer[close_index:])
-    payload = '\n'.join(payload_list)
+      if 'rss' in end_tag:
+        # RSS needs special handling, since it actually closes with
+        # a combination of </channel></rss> we need to traverse one
+        # level higher.
+        close_index = header_footer[:close_index].rfind('</')
+ assert close_index != -1, 'Could not find "</channel>" in feed envelope'
+        end_tag = header_footer[close_index:]
+        content_type = 'application/rss+xml'
+      elif 'feed' in end_tag:
+        content_type = 'application/atom+xml'
+      elif 'rdf' in end_tag:
+        content_type = 'application/rdf+xml'
+
+      payload_list = ['<?xml version="1.0" encoding="utf-8"?>',
+                      header_footer[:close_index]]
+      payload_list.extend(entry_payloads)
+      payload_list.append(header_footer[close_index:])
+      payload = '\n'.join(payload_list)
+    elif format == ARBITRARY:
+      # This is an arbitrary payload.
+      payload = header_footer

     if set_parent:
       parent = db.Key.from_path(
@@ -2300,8 +2310,9 @@

   Args:
     topic: The topic URL of the feed.
-    format: The string 'atom' or 'rss'.
+    format: The string 'atom', 'rss', or 'arbitrary'.
feed_content: The content of the feed, which may include unicode characters.
+      For arbitrary content, this is just the content itself.
     filter_feed: Used for dependency injection.

   Returns:
@@ -2317,6 +2328,9 @@
     xml.sax.SAXException if there is a parse error.
     feed_diff.Error if the feed could not be diffed for any other reason.
   """
+  if format == ARBITRARY:
+    return (feed_content, [], [])
+
   header_footer, entries_map = filter_feed(feed_content, format)

   # Find the new entries we've never seen before, and any entries that we
@@ -2459,9 +2473,9 @@
# of the last successful parse in the feed_record instance to speed this up
   # for the next time through.
   if 'rss' in (feed_record.format or feed_record.content_type or ''):
-    order = (RSS, ATOM)
+    order = (RSS, ATOM, ARBITRARY)
   else:
-    order = (ATOM, RSS)
+    order = (ATOM, RSS, ARBITRARY)

   parse_failures = 0
   for format in order:
@@ -2504,13 +2518,18 @@
     feed_record.update(headers, header_footer, format)
     parse_successful = True

-  if not entities_to_save:
+  if format != ARBITRARY and not entities_to_save:
     logging.debug('No new entries found')
     event_to_deliver = None
   else:
-    logging.info('Saving %d new/updated entries', len(entities_to_save))
+    logging.info(
+        'Saving %d new/updated entries for content '
+        'format=%r, content_type=%r, header_footer_bytes=%d',
+        len(entities_to_save), format, feed_record.content_type,
+        len(header_footer))
     event_to_deliver = EventToDeliver.create_event_for_topic(
-        feed_record.topic, format, header_footer, entry_payloads)
+        feed_record.topic, format, feed_record.content_type,
+        header_footer, entry_payloads)
     entities_to_save.insert(0, event_to_deliver)

   entities_to_save.insert(0, feed_record)
=======================================
--- /trunk/hub/main_test.py     Wed Sep 22 15:16:49 2010
+++ /trunk/hub/main_test.py     Fri Nov  5 13:41:59 2010
@@ -860,7 +860,8 @@
         sub_keys: Key instances corresponding to the entries in 'sub_list'.
     """
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     work_key = event.key()

@@ -881,7 +882,8 @@
   def testCreateEventForTopic(self):
     """Tests that the payload of an event is properly formed."""
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     expected_data = \
 u"""<?xml version="1.0" encoding="utf-8"?>
 <feed>
@@ -892,6 +894,7 @@
 <entry>article3</entry>
 </feed>"""
     self.assertEquals(expected_data, event.payload)
+    self.assertEquals('application/atom+xml', event.content_type)

   def testCreateEventForTopic_Rss(self):
     """Tests that the RSS payload is properly formed."""
@@ -903,7 +906,8 @@
     self.header_footer = (
         '<rss>\n<channel>\n<stuff>blah</stuff>\n<xmldata/></channel>\n</rss>')
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.RSS, self.header_footer, self.test_payloads)
+        self.topic, main.RSS, 'application/rss+xml',
+        self.header_footer, self.test_payloads)
     expected_data = \
 u"""<?xml version="1.0" encoding="utf-8"?>
 <rss>
@@ -916,11 +920,24 @@
 </channel>
 </rss>"""
     self.assertEquals(expected_data, event.payload)
+    self.assertEquals('application/rss+xml', event.content_type)
+
+  def testCreateEventForTopic_Abitrary(self):
+    """Tests that an arbitrary payload is properly formed."""
+    self.test_payloads = []
+    self.header_footer = 'this is my data here'
+    event = EventToDeliver.create_event_for_topic(
+        self.topic, main.ARBITRARY, 'my crazy content type',
+        self.header_footer, self.test_payloads)
+    expected_data = 'this is my data here'
+    self.assertEquals(expected_data, event.payload)
+    self.assertEquals('my crazy content type', event.content_type)

   def testCreateEvent_badHeaderFooter(self):
     """Tests when the header/footer data in an event is invalid."""
self.assertRaises(AssertionError, EventToDeliver.create_event_for_topic,
-        self.topic, main.ATOM, '<feed>has no end tag', self.test_payloads)
+        self.topic, main.ATOM, 'content type unused',
+        '<feed>has no end tag', self.test_payloads)

   def testNormal_noFailures(self):
     """Tests that event delivery with no failures will delete the event."""
@@ -1141,11 +1158,13 @@
   def testMaxFailuresOverride(self):
     """Tests the max_failures override value."""
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     self.assertEquals(None, event.max_failures)

     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads,
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads,
         max_failures=1)
     self.assertEquals(1, event.max_failures)

@@ -1637,11 +1656,15 @@

     self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))

-  def testParseFailure(self):
+  def testArbitraryContent(self):
     """Tests when the feed cannot be parsed as Atom or RSS."""
+    self.entry_list = []
+    self.entry_payloads = []
+    self.header_footer = 'this is all of the content'
     self.expected_exceptions.append(feed_diff.Error('whoops'))
     self.expected_exceptions.append(feed_diff.Error('whoops'))
     FeedToFetch.insert([self.topic])
+    self.headers['content-type'] = 'My Crazy Content Type'
     urlfetch_test_stub.instance.expect(
         'get', self.topic, 200, self.expected_response,
         response_headers=self.headers)
@@ -1649,12 +1672,28 @@

     feed = FeedToFetch.get_by_key_name(get_hash_key_name(self.topic))
     self.assertTrue(feed is None)
-
-    testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
-    testutil.get_tasks(main.FEED_QUEUE, expected_count=1)
+    self.assertEquals(0, len(list(FeedEntryRecord.all())))
+
+    work = EventToDeliver.all().get()
+    event_key = work.key()
+    self.assertEquals(self.topic, work.topic)
+    self.assertEquals('this is all of the content', work.payload)
+    work.delete()
+
+    record = FeedRecord.get_or_create(self.topic)
+    # header_footer not saved for arbitrary data
+    self.assertEquals(None, record.header_footer)
+    self.assertEquals(self.etag, record.etag)
+    self.assertEquals(self.last_modified, record.last_modified)
+    self.assertEquals('my crazy content type', record.content_type)
+
+    task = testutil.get_tasks(main.EVENT_QUEUE, index=0, expected_count=1)
+    self.assertEquals(str(event_key), task['params']['event_key'])
+
+    self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
+
     testutil.get_tasks(main.FEED_RETRIES_QUEUE, expected_count=0)

-    # Parsing errors do not count against the fetching scorer.
     self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))

   def testCacheHit(self):
@@ -2186,6 +2225,24 @@
     self.assertEquals('application/rdf+xml', event.content_type)
     self.assertEquals('rss', FeedRecord.all().get().format)

+  def testPullArbitrary(self):
+    """Tests pulling content of an arbitrary type."""
+    data = 'this is my random payload of data'
+    topic = 'http://example.com/my-topic'
+    callback = 'http://example.com/my-subscriber'
+ self.assertTrue(Subscription.insert(callback, topic, 'token', 'secret'))
+    FeedToFetch.insert([topic])
+    urlfetch_test_stub.instance.expect(
+        'get', topic, 200, data,
+        response_headers={'Content-Type': 'my crazy content type'})
+    self.run_fetch_task()
+    feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
+    self.assertTrue(feed is None)
+    event = EventToDeliver.all().get()
+    self.assertEquals(data, event.payload)
+    self.assertEquals('my crazy content type', event.content_type)
+    self.assertEquals('arbitrary', FeedRecord.all().get().format)
+
   def testMultipleFetch(self):
     """Tests doing multiple fetches asynchronously in parallel.

@@ -2308,7 +2365,8 @@
     urlfetch_test_stub.instance.expect(
'post', self.callback3, 299, '', request_payload=self.expected_payload)
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     self.handle('post', ('event_key', str(event.key())))
     self.assertEquals([], list(EventToDeliver.all()))
@@ -2348,7 +2406,8 @@
             'Content-Type': 'application/atom+xml',
             'X-Hub-Signature': 
'sha1=8b0a9da7204afa8ae04fc9439755c556b1e38d99'})
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     self.handle('post', ('event_key', str(event.key())))
     self.assertEquals([], list(EventToDeliver.all()))
@@ -2366,7 +2425,8 @@
             'Content-Type': 'application/rss+xml',
             'X-Hub-Signature': 
'sha1=1607313b6195af74f29158421f0a31aa25d680da'})
     event = EventToDeliver.create_event_for_topic(
- self.topic, main.RSS, self.header_footer_rss, self.test_payloads_rss)
+        self.topic, main.RSS, 'application/rss+xml',
+        self.header_footer_rss, self.test_payloads_rss)
     event.put()
     self.handle('post', ('event_key', str(event.key())))
     self.assertEquals([], list(EventToDeliver.all()))
@@ -2382,7 +2442,8 @@
         self.callback3, self.topic, 'token', 'secret'))
     main.EVENT_SUBSCRIBER_CHUNK_SIZE = 1
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     event_key = str(event.key())

@@ -2426,7 +2487,8 @@
         self.callback3, self.topic, 'token', 'secret'))
     main.EVENT_SUBSCRIBER_CHUNK_SIZE = 2
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     event_key = str(event.key())

@@ -2478,7 +2540,8 @@
           self.callback3, self.topic, 'token', 'secret'))
       main.EVENT_SUBSCRIBER_CHUNK_SIZE = 2
       event = EventToDeliver.create_event_for_topic(
-          self.topic, main.ATOM, self.header_footer, self.test_payloads)
+          self.topic, main.ATOM, 'application/atom+xml',
+          self.header_footer, self.test_payloads)
       event.put()
       event_key = str(event.key())
       self.handle('post', ('event_key', event_key))
@@ -2518,7 +2581,8 @@
         self.callback4, self.topic, 'token', 'secret'))
     main.EVENT_SUBSCRIBER_CHUNK_SIZE = 3
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     event_key = str(event.key())

@@ -2600,7 +2664,8 @@
         self.callback2, self.topic, 'token', 'secret'))
     main.EVENT_SUBSCRIBER_CHUNK_SIZE = 3
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, self.test_payloads)
     event.put()
     event_key = str(event.key())

@@ -2648,7 +2713,8 @@
           'post', self.callback3, 204, '',
           request_payload=self.expected_payload)
       event = EventToDeliver.create_event_for_topic(
-          self.topic, main.ATOM, self.header_footer, self.test_payloads)
+          self.topic, main.ATOM, 'application/atom+xml',
+          self.header_footer, self.test_payloads)
       event.put()
       self.handle('post', ('event_key', str(event.key())))
       self.assertEquals([], list(EventToDeliver.all()))
@@ -2680,7 +2746,8 @@
   def testEventCleanupTooYoung(self):
"""Tests when there are events present, but they're too young to remove."""
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, [])
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, [])
     event.last_modified = self.expire_time + datetime.timedelta(seconds=1)
     event.put()
     self.handle('get')
@@ -2689,12 +2756,14 @@
   def testEventCleanupOldEnough(self):
     """Tests when there are events old enough to clean up."""
     event = EventToDeliver.create_event_for_topic(
-        self.topic, main.ATOM, self.header_footer, [])
+        self.topic, main.ATOM, 'application/atom+xml',
+        self.header_footer, [])
     event.last_modified = self.expire_time
     event.put()

     too_young_event = EventToDeliver.create_event_for_topic(
-        self.topic + 'blah', main.ATOM, self.header_footer, [])
+        self.topic + 'blah', main.ATOM, 'application/atom+xml',
+        self.header_footer, [])
     too_young_event.put()

     self.handle('get')

Reply via email to