Revision: 392
Author: bslatkin
Date: Fri Nov 5 13:41:59 2010
Log: Hub: Adds support for arbitrary content payloads that do not parse as
rss/atom; still no security implementation available for http header values
http://code.google.com/p/pubsubhubbub/source/detail?r=392
Modified:
/trunk/hub/main.py
/trunk/hub/main_test.py
=======================================
--- /trunk/hub/main.py Wed Sep 22 15:16:49 2010
+++ /trunk/hub/main.py Fri Nov 5 13:41:59 2010
@@ -553,6 +553,7 @@
ATOM = 'atom'
RSS = 'rss'
+ARBITRARY = 'arbitrary'
VALID_PORTS = frozenset([
'80', '443', '4443', '8080', '8081', '8082', '8083', '8084', '8085',
@@ -1229,7 +1230,7 @@
topic = db.TextProperty(required=True)
header_footer = db.TextProperty() # Save this for debugging.
last_updated = db.DateTimeProperty(auto_now=True) # The last polling
time.
- format = db.TextProperty() # 'atom' or 'rss'
+ format = db.TextProperty() # 'atom', 'rss', or 'arbitrary'
# Content-related headers served by the feed' host.
content_type = db.TextProperty()
@@ -1291,10 +1292,10 @@
Args:
headers: Dictionary of response headers from the feed that should be
used
to determine how to poll the feed in the future.
- header_footer: Contents of the feed's XML document minus the entry
data;
- if not supplied, the old value will remain.
+ header_footer: Contents of the feed's XML document minus the entry
data.
+ if not supplied, the old value will remain. Only saved for feeds.
format: The last parsing format that worked correctly for this feed.
- Should be 'rss' or 'atom'.
+ Should be 'rss', 'atom', or 'arbitrary'.
"""
try:
self.content_type = headers.get('Content-Type', '').lower()
@@ -1311,10 +1312,10 @@
except UnicodeDecodeError:
logging.exception('ETag header had bad encoding')
- if header_footer is not None:
- self.header_footer = header_footer
if format is not None:
self.format = format
+ if header_footer is not None and self.format != ARBITRARY:
+ self.header_footer = header_footer
def get_request_headers(self):
"""Returns the request headers that should be used to pull this feed.
@@ -1439,6 +1440,7 @@
def create_event_for_topic(cls,
topic,
format,
+ content_type,
header_footer,
entry_payloads,
now=datetime.datetime.utcnow,
@@ -1448,9 +1450,12 @@
Args:
topic: The topic that had the event.
- format: Format of the feed, either 'atom' or 'rss'.
+ format: Format of the feed, 'atom', 'rss', or 'arbitrary'.
+ content_type: The original content type of the feed, fetched from the
+ server, if any. May be empty.
header_footer: The header and footer of the published feed into which
- the entry list will be spliced.
+ the entry list will be spliced. For arbitrary content this is the
+ full body of the resource.
entry_payloads: List of strings containing entry payloads (i.e., all
XML data for each entry, including surrounding tags) in order of
newest
to oldest.
@@ -1465,27 +1470,32 @@
Returns:
A new EventToDeliver instance that has not been stored.
"""
- close_index = header_footer.rfind('</')
- assert close_index != -1, 'Could not find "</" in feed envelope'
- end_tag = header_footer[close_index:]
- if 'rss' in end_tag:
- # RSS needs special handling, since it actually closes with
- # a combination of </channel></rss> we need to traverse one
- # level higher.
- close_index = header_footer[:close_index].rfind('</')
- assert close_index != -1, 'Could not find "</channel>" in feed
envelope'
+ if format in (ATOM, RSS):
+ # This is feed XML.
+ close_index = header_footer.rfind('</')
+ assert close_index != -1, 'Could not find "</" in feed envelope'
end_tag = header_footer[close_index:]
- content_type = 'application/rss+xml'
- elif 'feed' in end_tag:
- content_type = 'application/atom+xml'
- elif 'rdf' in end_tag:
- content_type = 'application/rdf+xml'
-
- payload_list = ['<?xml version="1.0" encoding="utf-8"?>',
- header_footer[:close_index]]
- payload_list.extend(entry_payloads)
- payload_list.append(header_footer[close_index:])
- payload = '\n'.join(payload_list)
+ if 'rss' in end_tag:
+ # RSS needs special handling, since it actually closes with
+ # a combination of </channel></rss> we need to traverse one
+ # level higher.
+ close_index = header_footer[:close_index].rfind('</')
+ assert close_index != -1, 'Could not find "</channel>" in feed
envelope'
+ end_tag = header_footer[close_index:]
+ content_type = 'application/rss+xml'
+ elif 'feed' in end_tag:
+ content_type = 'application/atom+xml'
+ elif 'rdf' in end_tag:
+ content_type = 'application/rdf+xml'
+
+ payload_list = ['<?xml version="1.0" encoding="utf-8"?>',
+ header_footer[:close_index]]
+ payload_list.extend(entry_payloads)
+ payload_list.append(header_footer[close_index:])
+ payload = '\n'.join(payload_list)
+ elif format == ARBITRARY:
+ # This is an arbitrary payload.
+ payload = header_footer
if set_parent:
parent = db.Key.from_path(
@@ -2300,8 +2310,9 @@
Args:
topic: The topic URL of the feed.
- format: The string 'atom' or 'rss'.
+ format: The string 'atom', 'rss', or 'arbitrary'.
feed_content: The content of the feed, which may include unicode
characters.
+ For arbitrary content, this is just the content itself.
filter_feed: Used for dependency injection.
Returns:
@@ -2317,6 +2328,9 @@
xml.sax.SAXException if there is a parse error.
feed_diff.Error if the feed could not be diffed for any other reason.
"""
+ if format == ARBITRARY:
+ return (feed_content, [], [])
+
header_footer, entries_map = filter_feed(feed_content, format)
# Find the new entries we've never seen before, and any entries that we
@@ -2459,9 +2473,9 @@
# of the last successful parse in the feed_record instance to speed this
up
# for the next time through.
if 'rss' in (feed_record.format or feed_record.content_type or ''):
- order = (RSS, ATOM)
+ order = (RSS, ATOM, ARBITRARY)
else:
- order = (ATOM, RSS)
+ order = (ATOM, RSS, ARBITRARY)
parse_failures = 0
for format in order:
@@ -2504,13 +2518,18 @@
feed_record.update(headers, header_footer, format)
parse_successful = True
- if not entities_to_save:
+ if format != ARBITRARY and not entities_to_save:
logging.debug('No new entries found')
event_to_deliver = None
else:
- logging.info('Saving %d new/updated entries', len(entities_to_save))
+ logging.info(
+ 'Saving %d new/updated entries for content '
+ 'format=%r, content_type=%r, header_footer_bytes=%d',
+ len(entities_to_save), format, feed_record.content_type,
+ len(header_footer))
event_to_deliver = EventToDeliver.create_event_for_topic(
- feed_record.topic, format, header_footer, entry_payloads)
+ feed_record.topic, format, feed_record.content_type,
+ header_footer, entry_payloads)
entities_to_save.insert(0, event_to_deliver)
entities_to_save.insert(0, feed_record)
=======================================
--- /trunk/hub/main_test.py Wed Sep 22 15:16:49 2010
+++ /trunk/hub/main_test.py Fri Nov 5 13:41:59 2010
@@ -860,7 +860,8 @@
sub_keys: Key instances corresponding to the entries in 'sub_list'.
"""
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
work_key = event.key()
@@ -881,7 +882,8 @@
def testCreateEventForTopic(self):
"""Tests that the payload of an event is properly formed."""
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
expected_data = \
u"""<?xml version="1.0" encoding="utf-8"?>
<feed>
@@ -892,6 +894,7 @@
<entry>article3</entry>
</feed>"""
self.assertEquals(expected_data, event.payload)
+ self.assertEquals('application/atom+xml', event.content_type)
def testCreateEventForTopic_Rss(self):
"""Tests that the RSS payload is properly formed."""
@@ -903,7 +906,8 @@
self.header_footer = (
'<rss>\n<channel>\n<stuff>blah</stuff>\n<xmldata/></channel>\n</rss>')
event = EventToDeliver.create_event_for_topic(
- self.topic, main.RSS, self.header_footer, self.test_payloads)
+ self.topic, main.RSS, 'application/rss+xml',
+ self.header_footer, self.test_payloads)
expected_data = \
u"""<?xml version="1.0" encoding="utf-8"?>
<rss>
@@ -916,11 +920,24 @@
</channel>
</rss>"""
self.assertEquals(expected_data, event.payload)
+ self.assertEquals('application/rss+xml', event.content_type)
+
+ def testCreateEventForTopic_Abitrary(self):
+ """Tests that an arbitrary payload is properly formed."""
+ self.test_payloads = []
+ self.header_footer = 'this is my data here'
+ event = EventToDeliver.create_event_for_topic(
+ self.topic, main.ARBITRARY, 'my crazy content type',
+ self.header_footer, self.test_payloads)
+ expected_data = 'this is my data here'
+ self.assertEquals(expected_data, event.payload)
+ self.assertEquals('my crazy content type', event.content_type)
def testCreateEvent_badHeaderFooter(self):
"""Tests when the header/footer data in an event is invalid."""
self.assertRaises(AssertionError,
EventToDeliver.create_event_for_topic,
- self.topic, main.ATOM, '<feed>has no end tag', self.test_payloads)
+ self.topic, main.ATOM, 'content type unused',
+ '<feed>has no end tag', self.test_payloads)
def testNormal_noFailures(self):
"""Tests that event delivery with no failures will delete the event."""
@@ -1141,11 +1158,13 @@
def testMaxFailuresOverride(self):
"""Tests the max_failures override value."""
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
self.assertEquals(None, event.max_failures)
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads,
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads,
max_failures=1)
self.assertEquals(1, event.max_failures)
@@ -1637,11 +1656,15 @@
self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
- def testParseFailure(self):
+ def testArbitraryContent(self):
"""Tests when the feed cannot be parsed as Atom or RSS."""
+ self.entry_list = []
+ self.entry_payloads = []
+ self.header_footer = 'this is all of the content'
self.expected_exceptions.append(feed_diff.Error('whoops'))
self.expected_exceptions.append(feed_diff.Error('whoops'))
FeedToFetch.insert([self.topic])
+ self.headers['content-type'] = 'My Crazy Content Type'
urlfetch_test_stub.instance.expect(
'get', self.topic, 200, self.expected_response,
response_headers=self.headers)
@@ -1649,12 +1672,28 @@
feed = FeedToFetch.get_by_key_name(get_hash_key_name(self.topic))
self.assertTrue(feed is None)
-
- testutil.get_tasks(main.EVENT_QUEUE, expected_count=0)
- testutil.get_tasks(main.FEED_QUEUE, expected_count=1)
+ self.assertEquals(0, len(list(FeedEntryRecord.all())))
+
+ work = EventToDeliver.all().get()
+ event_key = work.key()
+ self.assertEquals(self.topic, work.topic)
+ self.assertEquals('this is all of the content', work.payload)
+ work.delete()
+
+ record = FeedRecord.get_or_create(self.topic)
+ # header_footer not saved for arbitrary data
+ self.assertEquals(None, record.header_footer)
+ self.assertEquals(self.etag, record.etag)
+ self.assertEquals(self.last_modified, record.last_modified)
+ self.assertEquals('my crazy content type', record.content_type)
+
+ task = testutil.get_tasks(main.EVENT_QUEUE, index=0, expected_count=1)
+ self.assertEquals(str(event_key), task['params']['event_key'])
+
+ self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
+
testutil.get_tasks(main.FEED_RETRIES_QUEUE, expected_count=0)
- # Parsing errors do not count against the fetching scorer.
self.assertEquals([(1, 0)], main.FETCH_SCORER.get_scores([self.topic]))
def testCacheHit(self):
@@ -2186,6 +2225,24 @@
self.assertEquals('application/rdf+xml', event.content_type)
self.assertEquals('rss', FeedRecord.all().get().format)
+ def testPullArbitrary(self):
+ """Tests pulling content of an arbitrary type."""
+ data = 'this is my random payload of data'
+ topic = 'http://example.com/my-topic'
+ callback = 'http://example.com/my-subscriber'
+ self.assertTrue(Subscription.insert(callback,
topic, 'token', 'secret'))
+ FeedToFetch.insert([topic])
+ urlfetch_test_stub.instance.expect(
+ 'get', topic, 200, data,
+ response_headers={'Content-Type': 'my crazy content type'})
+ self.run_fetch_task()
+ feed = FeedToFetch.get_by_key_name(get_hash_key_name(topic))
+ self.assertTrue(feed is None)
+ event = EventToDeliver.all().get()
+ self.assertEquals(data, event.payload)
+ self.assertEquals('my crazy content type', event.content_type)
+ self.assertEquals('arbitrary', FeedRecord.all().get().format)
+
def testMultipleFetch(self):
"""Tests doing multiple fetches asynchronously in parallel.
@@ -2308,7 +2365,8 @@
urlfetch_test_stub.instance.expect(
'post', self.callback3, 299, '',
request_payload=self.expected_payload)
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
self.handle('post', ('event_key', str(event.key())))
self.assertEquals([], list(EventToDeliver.all()))
@@ -2348,7 +2406,8 @@
'Content-Type': 'application/atom+xml',
'X-Hub-Signature':
'sha1=8b0a9da7204afa8ae04fc9439755c556b1e38d99'})
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
self.handle('post', ('event_key', str(event.key())))
self.assertEquals([], list(EventToDeliver.all()))
@@ -2366,7 +2425,8 @@
'Content-Type': 'application/rss+xml',
'X-Hub-Signature':
'sha1=1607313b6195af74f29158421f0a31aa25d680da'})
event = EventToDeliver.create_event_for_topic(
- self.topic, main.RSS, self.header_footer_rss,
self.test_payloads_rss)
+ self.topic, main.RSS, 'application/rss+xml',
+ self.header_footer_rss, self.test_payloads_rss)
event.put()
self.handle('post', ('event_key', str(event.key())))
self.assertEquals([], list(EventToDeliver.all()))
@@ -2382,7 +2442,8 @@
self.callback3, self.topic, 'token', 'secret'))
main.EVENT_SUBSCRIBER_CHUNK_SIZE = 1
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
event_key = str(event.key())
@@ -2426,7 +2487,8 @@
self.callback3, self.topic, 'token', 'secret'))
main.EVENT_SUBSCRIBER_CHUNK_SIZE = 2
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
event_key = str(event.key())
@@ -2478,7 +2540,8 @@
self.callback3, self.topic, 'token', 'secret'))
main.EVENT_SUBSCRIBER_CHUNK_SIZE = 2
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
event_key = str(event.key())
self.handle('post', ('event_key', event_key))
@@ -2518,7 +2581,8 @@
self.callback4, self.topic, 'token', 'secret'))
main.EVENT_SUBSCRIBER_CHUNK_SIZE = 3
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
event_key = str(event.key())
@@ -2600,7 +2664,8 @@
self.callback2, self.topic, 'token', 'secret'))
main.EVENT_SUBSCRIBER_CHUNK_SIZE = 3
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
event_key = str(event.key())
@@ -2648,7 +2713,8 @@
'post', self.callback3, 204, '',
request_payload=self.expected_payload)
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, self.test_payloads)
event.put()
self.handle('post', ('event_key', str(event.key())))
self.assertEquals([], list(EventToDeliver.all()))
@@ -2680,7 +2746,8 @@
def testEventCleanupTooYoung(self):
"""Tests when there are events present, but they're too young to
remove."""
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, [])
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, [])
event.last_modified = self.expire_time + datetime.timedelta(seconds=1)
event.put()
self.handle('get')
@@ -2689,12 +2756,14 @@
def testEventCleanupOldEnough(self):
"""Tests when there are events old enough to clean up."""
event = EventToDeliver.create_event_for_topic(
- self.topic, main.ATOM, self.header_footer, [])
+ self.topic, main.ATOM, 'application/atom+xml',
+ self.header_footer, [])
event.last_modified = self.expire_time
event.put()
too_young_event = EventToDeliver.create_event_for_topic(
- self.topic + 'blah', main.ATOM, self.header_footer, [])
+ self.topic + 'blah', main.ATOM, 'application/atom+xml',
+ self.header_footer, [])
too_young_event.put()
self.handle('get')