Revision: 375
Author: bslatkin
Date: Sun Jul 11 22:24:18 2010
Log: hub: adds non-standard virtual feeds hook, fork-join tweaks
http://code.google.com/p/pubsubhubbub/source/detail?r=375
Added:
/trunk/nonstandard/virtual_feed.py
/trunk/nonstandard/virtual_feed_test.py
Modified:
/trunk/hub/feed_diff_test.py
/trunk/hub/fork_join_queue.py
/trunk/hub/fork_join_queue_test.py
/trunk/hub/main.py
/trunk/hub/main_test.py
/trunk/hub/testutil.py
=======================================
--- /dev/null
+++ /trunk/nonstandard/virtual_feed.py Sun Jul 11 22:24:18 2010
@@ -0,0 +1,167 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Virtual feed receiver for publishing aggregate feeds using fat pings.
+
+This isn't part of the PubSubHubbub spec. We have no intentions of making
it to
+be part of the spec. This extension hook is useful for creating a virtual
+feed (e.g., a firehose feed) using an aggregation of other feeds that are
+fat pinged to the hub.
+
+It's up to your code to connect your fat-ping request handler to the
+'inject_virtual_feed' method (see fat_publish.py for one way to do it).
That
+function will parse your feed, extract the entries, and enqueue the virtual
+feed update. Multiple fat pings delivered in this manner will be collated
+together by their virtual feed topics into a combined payload, which is
then
+injected into the reference hub's event delivery pipeline. This collation
is
+useful because it controls how many HTTP requests will be sent to
subscribers
+to this virtual feed, and lets you make the tradeoff between delivery
latency
+and request overhead.
+"""
+
+import logging
+
+from google.appengine.ext import db
+from google.appengine.ext import webapp
+
+import fork_join_queue
+
+################################################################################
+# Constants
+
+VIRTUAL_FEED_QUEUE = 'virtual-feeds'
+
+################################################################################
+
+# Define these symbols for testing.
+if 'register' not in globals():
+ class Hook(object):
+ pass
+ def work_queue_only(func):
+ return func
+ sha1_hash = None
+
+
+class FeedFragment(db.Model):
+ """Represents a fragment of a virtual feed that will be collated.
+
+ The Key and key_name are not used.
+
+ Fields:
+ topic: The topic of the virtual feed being collated.
+ header_footer: The feed envelope.
+ entries: The <entry>...</entry> text segments that were parsed from the
+ source feeds, already joined together with newlines.
+ format: 'rss' or 'atom'.
+ """
+ topic = db.TextProperty()
+ header_footer = db.TextProperty()
+ entries = db.TextProperty()
+ format = db.TextProperty()
+
+
+VIRTUAL_FEED_QUEUE = fork_join_queue.MemcacheForkJoinQueue(
+ FeedFragment,
+ None,
+ '/work/virtual_feeds',
+ VIRTUAL_FEED_QUEUE,
+ batch_size=20,
+ batch_period_ms=1000,
+ lock_timeout_ms=1000,
+ sync_timeout_ms=250,
+ stall_timeout_ms=30000,
+ acquire_timeout_ms=10,
+ acquire_attempts=50,
+ shard_count=1,
+ expiration_seconds=60) # Give up on fragments after 60 seconds.
+
+
+def inject_virtual_feed(topic, format, header_footer, entries_map):
+ """Injects a virtual feed update to be collated and then delievered.
+
+ Args:
+ topic: The topic URL for the virtual feed.
+ format: The format of the virtual feed ('rss' or 'atom').
+ header_footer: The feed envelope to use for the whole virtual feed.
+ entries_map: Dictionary mapping feed entry IDs to strings containing
+ full entry payloads (e.g., from <entry> to </entry> including the
tags).
+
+ Raises:
+ MemcacheError if the virtual feed could not be injected.
+ """
+ fragment = FeedFragment(
+ key=db.Key.from_path(FeedFragment.kind(), 'unused'),
+ topic=topic,
+ header_footer=header_footer,
+ entries='\n'.join(entries_map.values()),
+ format=format)
+
+ # Update the name of the queue to include a hash of the topic URL. This
+ # allows us to use a single VIRTUAL_FEED_QUEUE instance to represent a
+ # different logical queue for each virtual feed topic we would like to
+ # collate.
+ VIRTUAL_FEED_QUEUE.name = 'fjq-%s-%s-' % (
+ FeedFragment.kind(), sha1_hash(topic))
+ work_index = VIRTUAL_FEED_QUEUE.next_index()
+ try:
+ VIRTUAL_FEED_QUEUE.put(work_index, [fragment])
+ finally:
+ VIRTUAL_FEED_QUEUE.add(work_index)
+
+
+class CollateFeedHandler(webapp.RequestHandler):
+ """Worker handler that collates virtual feed updates and enqueues
them."""
+
+ @work_queue_only
+ def post(self):
+ # Restore the pseudo-name of the queue so we can properly pop tasks
+ # from it for the specific virtual feed this task is targetting.
+ task_name = self.request.headers['X-AppEngine-TaskName']
+ VIRTUAL_FEED_QUEUE.name, rest = task_name.split('--')
+ VIRTUAL_FEED_QUEUE.name += '-'
+
+ fragment_list = VIRTUAL_FEED_QUEUE.pop_request(self.request)
+ if not fragment_list:
+ logging.warning('Pop of virtual feed task %r found no fragments.',
+ task_name)
+ return
+
+ fragment = fragment_list[0]
+ entry_payloads = [f.entries for f in fragment_list]
+
+ def txn():
+ event_to_deliver = EventToDeliver.create_event_for_topic(
+ fragment.topic, fragment.format, fragment.header_footer,
+ entry_payloads, set_parent=False, max_failures=1)
+ db.put(event_to_deliver)
+ event_to_deliver.enqueue()
+
+ db.run_in_transaction(txn)
+ logging.debug('Injected %d fragments for virtual topic %r',
+ len(fragment_list), fragment.topic)
+
+
+class VirtualFeedHook(Hook):
+ """Adds the CollateFeedHandler to the list of request handlers."""
+
+ def inspect(self, args, kwargs):
+ args[0].append((r'/work/virtual_feeds', CollateFeedHandler))
+ return False
+
+
+if 'register' in globals():
+ register(modify_handlers, VirtualFeedHook())
=======================================
--- /dev/null
+++ /trunk/nonstandard/virtual_feed_test.py Sun Jul 11 22:24:18 2010
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for the virtual_feed module."""
+
+import logging
+logging.basicConfig(format='%(levelname)-8s %(filename)s] %(message)s')
+import os
+import sys
+import unittest
+
+# Run these tests from the 'hub' directory.
+sys.path.insert(0, os.getcwd())
+os.chdir('../hub')
+sys.path.insert(0, os.getcwd())
+
+import testutil
+testutil.fix_path()
+
+from google.appengine.ext import webapp
+
+import main
+import virtual_feed
+
+################################################################################
+
+# Do aliasing that would happen anyways during Hook module loading.=
+virtual_feed.sha1_hash = main.sha1_hash
+virtual_feed.EventToDeliver = main.EventToDeliver
+
+
+class InjectVirtualFeedTest(testutil.HandlerTestBase):
+ """Tests for the inject_virtual_feed function."""
+
+ def setUp(self):
+ """Sets up the test harness."""
+ testutil.setup_for_testing()
+ self.topic = 'http://example.com/my-topic/1'
+ self.topic2 = 'http://example.com/my-topic/2'
+ self.format = 'atom'
+ self.header_footer = '<feed><id>tag:my-id</id>\n</feed>'
+ self.entries_map = {
+ 'one': '<entry>first data</entry>',
+ 'two': '<entry>second data</entry>',
+ 'three': '<entry>third data</entry>',
+ }
+ os.environ['CURRENT_VERSION_ID'] = 'my-version.1234'
+ virtual_feed.VIRTUAL_FEED_QUEUE.queue_name = 'default'
+
+ def testInsertOneFragment(self):
+ """Tests inserting one new fragment."""
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ task = testutil.get_tasks('default', index=0, expected_count=1)
+ self.assertTrue(task['name'].startswith(
+ 'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41--'
+ 'my-version-'))
+ results = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task['name'])
+ self.assertEquals(1, len(results))
+ fragment = results[0]
+ self.assertEquals(self.topic, fragment.topic)
+ self.assertEquals(self.header_footer, fragment.header_footer)
+ self.assertEquals(self.format, fragment.format)
+ self.assertEquals(
+ '<entry>third data</entry>\n' # Hash order
+ '<entry>second data</entry>\n'
+ '<entry>first data</entry>',
+ fragment.entries)
+
+ def testInsertMultipleFragments(self):
+ """Tests inserting multiple fragments on different virtual topics."""
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ virtual_feed.inject_virtual_feed(
+ self.topic2, self.format, self.header_footer, self.entries_map)
+
+ task1, task2 = testutil.get_tasks('default', expected_count=2)
+ self.assertTrue(task1['name'].startswith(
+ 'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41--'
+ 'my-version-'))
+ self.assertTrue(task2['name'].startswith(
+ 'fjq-FeedFragment-0449375bf584a7a5d3a09b344a726dead30c3927--'
+ 'my-version-'))
+
+ virtual_feed.VIRTUAL_FEED_QUEUE.name = \
+ 'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41-'
+ fragment1 = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task1['name'])[0]
+ self.assertEquals(self.topic, fragment1.topic)
+
+ virtual_feed.VIRTUAL_FEED_QUEUE.name = \
+ 'fjq-FeedFragment-0449375bf584a7a5d3a09b344a726dead30c3927-'
+ fragment2 = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task2['name'])[0]
+ self.assertEquals(self.topic2, fragment2.topic)
+
+
+class CollateFeedHandlerTest(testutil.HandlerTestBase):
+ """Tests for the CollateFeedHandler class."""
+
+ handler_class = virtual_feed.CollateFeedHandler
+
+ def setUp(self):
+ """Sets up the test harness."""
+ testutil.HandlerTestBase.setUp(self)
+ self.topic = 'http://example.com/my-topic/1'
+ self.topic2 = 'http://example.com/my-topic/2'
+ self.format = 'atom'
+ self.header_footer = '<feed><id>tag:my-id</id>\n</feed>'
+ self.entries_map = {
+ 'one': '<entry>first data</entry>',
+ 'two': '<entry>second data</entry>',
+ 'three': '<entry>third data</entry>',
+ }
+ os.environ['CURRENT_VERSION_ID'] = 'my-version.1234'
+ virtual_feed.VIRTUAL_FEED_QUEUE.queue_name = 'default'
+
+ def testNoWork(self):
+ """Tests when the queue is empty."""
+ os.environ['HTTP_X_APPENGINE_TASKNAME'] = (
+ 'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f01abb6830d41--'
+ 'my-version-42630240-2654435761-0')
+ self.handle('post')
+
+ def testOneFragment(self):
+ """Tests when there is one fragment in the queue."""
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ task = testutil.get_tasks('default', index=0, expected_count=1)
+ os.environ['HTTP_X_APPENGINE_TASKNAME'] = task['name']
+ self.handle('post')
+
+ event_list = list(main.EventToDeliver.all())
+ self.assertEquals(1, len(event_list))
+ event = event_list[0]
+
+ # No parent to ensure it's not rate limited by an entity group.
+ self.assertEquals(None, event.key().parent())
+
+ self.assertEquals(self.topic, event.topic)
+ self.assertEquals(
+ '<?xml version="1.0" encoding="utf-8"?>\n'
+ '<feed><id>tag:my-id</id>\n\n'
+ '<entry>third data</entry>\n'
+ '<entry>second data</entry>\n'
+ '<entry>first data</entry>\n'
+ '</feed>',
+ event.payload)
+ self.assertEquals('application/atom+xml', event.content_type)
+ self.assertEquals(1, event.max_failures)
+
+ task = testutil.get_tasks('event-delivery', index=0, expected_count=1)
+ self.assertEquals(str(event.key()), task['params']['event_key'])
+
+ def testMultipleFragments(self):
+ """Tests when there is more than one fragment in the queue."""
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ task = testutil.get_tasks('default', index=0, expected_count=1)
+ os.environ['HTTP_X_APPENGINE_TASKNAME'] = task['name']
+ self.handle('post')
+
+ event_list = list(main.EventToDeliver.all())
+ self.assertEquals(1, len(event_list))
+ event = event_list[0]
+
+ self.assertEquals(self.topic, event.topic)
+ self.assertEquals(
+ '<?xml version="1.0" encoding="utf-8"?>\n'
+ '<feed><id>tag:my-id</id>\n\n'
+ '<entry>third data</entry>\n'
+ '<entry>second data</entry>\n'
+ '<entry>first data</entry>\n'
+ '<entry>third data</entry>\n'
+ '<entry>second data</entry>\n'
+ '<entry>first data</entry>\n'
+ '</feed>',
+ event.payload)
+
+ def testMultipleQueues(self):
+ """Tests multiple virtual feeds and queues."""
+ virtual_feed.inject_virtual_feed(
+ self.topic, self.format, self.header_footer, self.entries_map)
+ virtual_feed.inject_virtual_feed(
+ self.topic2, self.format, self.header_footer, self.entries_map)
+ task1, task2 = testutil.get_tasks('default', expected_count=2)
+
+ os.environ['HTTP_X_APPENGINE_TASKNAME'] = task1['name']
+ self.handle('post')
+
+ os.environ['HTTP_X_APPENGINE_TASKNAME'] = task2['name']
+ self.handle('post')
+
+ event_list = list(main.EventToDeliver.all())
+ self.assertEquals(2, len(event_list))
+ self.assertEquals(self.topic, event_list[0].topic)
+ self.assertEquals(self.topic2, event_list[1].topic)
+
+################################################################################
+
+if __name__ == '__main__':
+ unittest.main()
=======================================
--- /trunk/hub/feed_diff_test.py Thu Jun 3 15:45:54 2010
+++ /trunk/hub/feed_diff_test.py Sun Jul 11 22:24:18 2010
@@ -303,6 +303,9 @@
self.load_feed('xhtml_entities.xml')
self.fail('Should have raised an exception')
except feed_diff.Error, e:
+ # TODO(bslatkin): Fix this datafile in head.
+ # This ensures that hte failure is because of bad entities, not a
+ # missing test data file.
self.assertFalse('IOError' in str(e))
=======================================
--- /trunk/hub/fork_join_queue.py Sun Jun 27 11:46:09 2010
+++ /trunk/hub/fork_join_queue.py Sun Jul 11 22:24:18 2010
@@ -180,16 +180,27 @@
self.stall_timeout = stall_timeout_ms / 1000.0
self.acquire_timeout = acquire_timeout_ms / 1000.0
self.acquire_attempts = acquire_attempts
-
- self.lock_name = self.name + '-lock'
- self.add_counter_template = self.name + '-add-lock:%d'
- self.index_name = self.name + '-index'
self.batch_delta = datetime.timedelta(microseconds=batch_period_ms *
1000)
def get_queue_name(self, index):
"""Returns the name of the queue to use based on the given work
index."""
return self.queue_name
+ @property
+ def lock_name(self):
+ """Returns the lock key prefix for the current prefix name."""
+ return self.name + '-lock'
+
+ @property
+ def add_counter_template(self):
+ """Returns the add counter prefix template for the current prefix
name."""
+ return self.name + '-add-lock:%d'
+
+ @property
+ def index_name(self):
+ """Returns the index key prefix for the current prefix name."""
+ return self.name + '-index'
+
def next_index(self,
memget=memcache.get,
memincr=memcache.incr,
@@ -226,6 +237,10 @@
return next_index
time.sleep(self.acquire_timeout)
else:
+ # Force the index forward; here we're stuck in a loop where the
memcache
+ # index was evicted and all new lock acqusitions are reusing old
locks
+ # that were already closed off to new writers.
+ memincr(self.index_name)
raise WriterLockError('Task adder could not increment writer lock.')
def add(self, index, gettime=time.time):
@@ -235,23 +250,37 @@
# memcache is evicted. This prevents new task names from overlapping
with
# old ones.
nearest_gap = int(now_stamp / self.stall_timeout)
+ # Include major version in the task name to ensure that test tasks
+ # enqueued from a non-default major version will run in the new context
+ # instead of the default major version.
+ major_version, minor_version =
os.environ['CURRENT_VERSION_ID'].split('.')
+ task_name = '%s-%s-%d-%d-%d' % (
+ self.name, major_version, nearest_gap, index, 0)
+
+ # When the batch_delta is zero, then there should be no ETA, the task
+ # should run immediately and the reader will busy wait for all writers.
+ if self.batch_delta == 0:
+ eta = None
+ else:
+ eta = datetime_from_stamp(now_stamp) + self.batch_delta
+
try:
taskqueue.Task(
method='POST',
- name='%s-%d-%d-%d' % (self.name, nearest_gap, index, 0),
+ name=task_name,
url=self.task_path,
- eta=datetime_from_stamp(now_stamp) + self.batch_delta
+ eta=eta
).add(self.get_queue_name(index))
except taskqueue.TaskAlreadyExistsError:
# This is okay. It means the task has already been inserted by
another
# add() call for this same batch. We're holding the lock at this
point
# so we know that job won't start yet.
pass
- except taskqueue.TombstonedTaskError:
+ except taskqueue.TombstonedTaskError, e:
# This is bad. This means 1) the lock we held expired and the task
already
# ran, 2) this task name somehow overlaps with an old task. Return
the
# error to the caller so they can try again.
- raise TaskConflictError('Task named "%s" tombstoned' % task.name)
+ raise TaskConflictError('Task named tombstoned: %s' % e)
finally:
# Don't bother checking the decr status; worst-case the worker job
# will time out after some number of seconds and proceed anyways.
@@ -295,8 +324,7 @@
return False
def _query_work(self, index, cursor):
- """TODO
- """
+ """Queries for work in the Datastore."""
query = (self.model_class.all()
.filter('%s =' % self.index_property.name, index)
.order('__key__'))
@@ -314,6 +342,7 @@
Returns:
A list of work items, if any.
"""
+ # TODO: Use request.headers['X-AppEngine-TaskName'] instead of environ.
return self.pop(os.environ['HTTP_X_APPENGINE_TASKNAME'],
request.get('cursor'))
@@ -338,17 +367,22 @@
result_list, cursor = self._query_work(index, cursor)
if len(result_list) == self.batch_size:
- try:
- taskqueue.Task(
- method='POST',
- name='%s-%d-%d' % (rest, index, generation + 1),
- url=self.task_path,
- params={'cursor': cursor}
- ).add(self.get_queue_name(index))
- except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
- # This means the continuation chain already started and this root
- # task failed for some reason; no problem.
- pass
+ for i in xrange(3):
+ try:
+ taskqueue.Task(
+ method='POST',
+ name='%s-%d-%d' % (rest, index, generation + 1),
+ url=self.task_path,
+ params={'cursor': cursor}
+ ).add(self.get_queue_name(index))
+ except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
+ # This means the continuation chain already started and this root
+ # task failed for some reason; no problem.
+ break
+ except (taskqueue.TransientError, taskqueue.InternalError):
+ # Ignore transient taskqueue errors.
+ if i == 2:
+ raise
return result_list
=======================================
--- /trunk/hub/fork_join_queue_test.py Sun Jun 27 11:46:09 2010
+++ /trunk/hub/fork_join_queue_test.py Sun Jul 11 22:24:18 2010
@@ -48,7 +48,7 @@
'/path/to/my/task',
'default',
batch_size=3,
- batch_period_ms=200,
+ batch_period_ms=2200,
lock_timeout_ms=1000,
sync_timeout_ms=250,
stall_timeout_ms=30000,
@@ -56,13 +56,27 @@
acquire_attempts=20)
+TEST_QUEUE_ZERO_BATCH_TIME = fork_join_queue.ForkJoinQueue(
+ TestModel,
+ TestModel.work_index,
+ '/path/to/my/task',
+ 'default',
+ batch_size=3,
+ batch_period_ms=0,
+ lock_timeout_ms=1000,
+ sync_timeout_ms=250,
+ stall_timeout_ms=30000,
+ acquire_timeout_ms=50,
+ acquire_attempts=20)
+
+
SHARDED_QUEUE = fork_join_queue.ShardedForkJoinQueue(
TestModel,
TestModel.work_index,
'/path/to/my/task',
'default-%(shard)s',
batch_size=3,
- batch_period_ms=200,
+ batch_period_ms=2200,
lock_timeout_ms=1000,
sync_timeout_ms=250,
stall_timeout_ms=30000,
@@ -77,7 +91,7 @@
'/path/to/my/task',
'default',
batch_size=3,
- batch_period_ms=200,
+ batch_period_ms=2200,
lock_timeout_ms=1000,
sync_timeout_ms=250,
stall_timeout_ms=30000,
@@ -96,17 +110,26 @@
self.now2 = 1274078097.79072
self.gettime1 = lambda: self.now1
self.gettime2 = lambda: self.now2
+ os.environ['CURRENT_VERSION_ID'] = 'myversion.1234'
if 'HTTP_X_APPENGINE_TASKNAME' in os.environ:
del os.environ['HTTP_X_APPENGINE_TASKNAME']
- def expect_task(self, index, generation=0, now_time=None, cursor=None):
+ def expect_task(self,
+ index,
+ generation=0,
+ now_time=None,
+ cursor=None,
+ batch_period_ms=2200000):
"""Creates an expected task dictionary."""
if now_time is None:
now_time = self.now1
gap_number = int(now_time / 30.0)
+ import math
work_item = {
- 'name': 'fjq-TestModel-%d-%d-%d' % (gap_number, index, generation),
- 'eta': int(10**6 * now_time) + 500000,
+ 'name': 'fjq-TestModel-myversion-%d-%d-%d' % (
+ gap_number, index, generation),
+ # Working around weird rounding behavior of task queue stub.
+ 'eta': (10**6 * now_time) + batch_period_ms,
}
if cursor is not None:
work_item['cursor'] = cursor
@@ -117,7 +140,10 @@
found_tasks.sort(key=lambda t: t['eta'])
for expected, found in zip(expected_tasks, found_tasks):
self.assertEquals(expected['name'], found['name'])
- self.assertEquals(int(expected['eta'] / 10**6), int(found['eta'] /
10**6))
+ # Round these task ETAs to integers because the taskqueue stub does
+ # not support floating-point ETAs.
+ self.assertEquals(round(expected['eta'] / 10**6),
+ round(found['eta'] / 10**6))
self.assertEquals(expected.get('cursor'),
found.get('params', {}).get('cursor'))
self.assertEquals('POST', found['method'])
@@ -223,10 +249,18 @@
def testNextIndexBusyWaitFail(self):
"""Tests when busy waiting for a new index fails."""
+ seen_keys = []
+ def fake_incr(key, *args, **kwargs):
+ seen_keys.append(key)
+ return 100
self.assertRaises(
fork_join_queue.WriterLockError,
TEST_QUEUE.next_index,
- memincr=lambda *a, **k: 100)
+ memincr=fake_incr)
+ self.assertEquals(
+ (['fjq-TestModel-add-lock:2654435761'] * 20) +
+ ['fjq-TestModel-index'],
+ seen_keys)
def testPopOne(self):
"""Tests popping a single entity from the queue."""
@@ -336,6 +370,16 @@
work_index = TEST_QUEUE.next_index()
self.assertFalse(TEST_QUEUE._increment_index(work_index))
+ def testZeroBatchTime(self):
+ """Tests that zero batch time results in no task ETA."""
+ work_index = TEST_QUEUE_ZERO_BATCH_TIME.next_index()
+ task = TestModel(work_index=work_index, number=1)
+ db.put(task)
+ TEST_QUEUE_ZERO_BATCH_TIME.add(work_index, gettime=self.gettime1)
+ self.assertTasksEqual(
+ [self.expect_task(work_index, batch_period_ms=0)],
+ testutil.get_tasks('default', usec_eta=True))
+
def testShardedQueue(self):
"""Tests adding and popping from a sharded queue with continuation."""
from google.appengine.api import apiproxy_stub_map
=======================================
--- /trunk/hub/main.py Sun Jun 27 12:57:44 2010
+++ /trunk/hub/main.py Sun Jul 11 22:24:18 2010
@@ -1117,6 +1117,8 @@
if memory_only:
cls.FORK_JOIN_QUEUE.put(work_index, feed_list)
else:
+ # TODO(bslatkin): Insert fetching tasks here to fix the polling
+ # mode for this codebase.
db.put(feed_list)
finally:
if memory_only:
@@ -1424,14 +1426,21 @@
last_callback = db.TextProperty(default='') # For paging Subscriptions
failed_callbacks = db.ListProperty(db.Key) # Refs to Subscription
entities
delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES)
- retry_attempts = db.IntegerProperty(default=0)
+ retry_attempts = db.IntegerProperty(default=0, indexed=False)
last_modified = db.DateTimeProperty(required=True)
totally_failed = db.BooleanProperty(default=False)
content_type = db.TextProperty(default='')
+ max_failures = db.IntegerProperty(indexed=False, indexed=False)
@classmethod
- def create_event_for_topic(cls, topic, format, header_footer,
entry_payloads,
- now=datetime.datetime.utcnow):
+ def create_event_for_topic(cls,
+ topic,
+ format,
+ header_footer,
+ entry_payloads,
+ now=datetime.datetime.utcnow,
+ set_parent=True,
+ max_failures=None):
"""Creates an event to deliver for a topic and set of published
entries.
Args:
@@ -1443,6 +1452,12 @@
XML data for each entry, including surrounding tags) in order of
newest
to oldest.
now: Returns the current time as a UTC datetime. Used in tests.
+ set_parent: Set the parent to the FeedRecord for the given topic.
This is
+ necessary for the parse_feed flow's transaction. Default is True.
Set
+ to False if this EventToDeliver will be written outside of the
+ FeedRecord transaction.
+ max_failures: Maximum number of failures to allow for this event.
When
+ None (the default) it will use the MAX_DELIVERY_FAILURES constant.
Returns:
A new EventToDeliver instance that has not been stored.
@@ -1469,14 +1484,20 @@
payload_list.append(header_footer[close_index:])
payload = '\n'.join(payload_list)
+ if set_parent:
+ parent = db.Key.from_path(
+ FeedRecord.kind(), FeedRecord.create_key_name(topic))
+ else:
+ parent = None
+
return cls(
- parent=db.Key.from_path(
- FeedRecord.kind(), FeedRecord.create_key_name(topic)),
+ parent=parent,
topic=topic,
topic_hash=sha1_hash(topic),
payload=payload,
last_modified=now(),
- content_type=content_type)
+ content_type=content_type,
+ max_failures=max_failures)
def get_next_subscribers(self, chunk_size=None):
"""Retrieve the next set of subscribers to attempt delivery for this
event.
@@ -1577,6 +1598,8 @@
retry_delay = retry_period * (2 ** self.retry_attempts)
self.last_modified += datetime.timedelta(seconds=retry_delay)
self.retry_attempts += 1
+ if self.max_failures is not None:
+ max_failures = self.max_failures
if self.retry_attempts > max_failures:
self.totally_failed = True
=======================================
--- /trunk/hub/main_test.py Sun Jun 27 11:46:09 2010
+++ /trunk/hub/main_test.py Sun Jul 11 22:24:18 2010
@@ -1135,6 +1135,30 @@
testutil.get_tasks(main.EVENT_QUEUE, expected_count=1)
testutil.get_tasks(main.POLLING_QUEUE, expected_count=1)
+ def testMaxFailuresOverride(self):
+ """Tests the max_failures override value."""
+ event = EventToDeliver.create_event_for_topic(
+ self.topic, main.ATOM, self.header_footer, self.test_payloads)
+ self.assertEquals(None, event.max_failures)
+
+ event = EventToDeliver.create_event_for_topic(
+ self.topic, main.ATOM, self.header_footer, self.test_payloads,
+ max_failures=1)
+ self.assertEquals(1, event.max_failures)
+
+ Subscription.insert(
+ self.callback, self.topic, self.token, self.secret)
+ subscription_list = list(Subscription.all())
+
+ event.put()
+ event.update(False, subscription_list)
+ event2 = db.get(event.key())
+ self.assertFalse(event2.totally_failed)
+
+ event2.update(False, [])
+ event3 = db.get(event.key())
+ self.assertTrue(event3.totally_failed)
+
################################################################################
class PublishHandlerTest(testutil.HandlerTestBase):
=======================================
--- /trunk/hub/testutil.py Mon May 17 01:57:39 2010
+++ /trunk/hub/testutil.py Sun Jul 11 22:24:18 2010
@@ -29,10 +29,12 @@
TEST_APP_ID = 'my-app-id'
+TEST_VERSION_ID = 'my-version.1234'
# Assign the application ID up front here so we can create db.Key instances
# before doing any other test setup.
os.environ['APPLICATION_ID'] = TEST_APP_ID
+os.environ['CURRENT_VERSION_ID'] = TEST_VERSION_ID
def fix_path():