Revision: 375
Author: bslatkin
Date: Sun Jul 11 22:24:18 2010
Log: hub: adds non-standard virtual feeds hook, fork-join tweaks
http://code.google.com/p/pubsubhubbub/source/detail?r=375

Added:
 /trunk/nonstandard/virtual_feed.py
 /trunk/nonstandard/virtual_feed_test.py
Modified:
 /trunk/hub/feed_diff_test.py
 /trunk/hub/fork_join_queue.py
 /trunk/hub/fork_join_queue_test.py
 /trunk/hub/main.py
 /trunk/hub/main_test.py
 /trunk/hub/testutil.py

=======================================
--- /dev/null
+++ /trunk/nonstandard/virtual_feed.py  Sun Jul 11 22:24:18 2010
@@ -0,0 +1,167 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Virtual feed receiver for publishing aggregate feeds using fat pings.
+
+This isn't part of the PubSubHubbub spec. We have no intentions of making it to
+be part of the spec. This extension hook is useful for creating a virtual
+feed (e.g., a firehose feed) using an aggregation of other feeds that are
+fat pinged to the hub.
+
+It's up to your code to connect your fat-ping request handler to the
+'inject_virtual_feed' method (see fat_publish.py for one way to do it). That
+function will parse your feed, extract the entries, and enqueue the virtual
+feed update. Multiple fat pings delivered in this manner will be collated
+together by their virtual feed topics into a combined payload, which is then +injected into the reference hub's event delivery pipeline. This collation is +useful because it controls how many HTTP requests will be sent to subscribers +to this virtual feed, and lets you make the tradeoff between delivery latency
+and request overhead.
+"""
+
+import logging
+
+from google.appengine.ext import db
+from google.appengine.ext import webapp
+
+import fork_join_queue
+
+################################################################################
+# Constants
+
+VIRTUAL_FEED_QUEUE = 'virtual-feeds'
+
+################################################################################
+
+# Define these symbols for testing.
+if 'register' not in globals():
+  class Hook(object):
+    pass
+  def work_queue_only(func):
+    return func
+  sha1_hash = None
+
+
+class FeedFragment(db.Model):
+  """Represents a fragment of a virtual feed that will be collated.
+
+  The Key and key_name are not used.
+
+  Fields:
+    topic: The topic of the virtual feed being collated.
+    header_footer: The feed envelope.
+    entries: The <entry>...</entry> text segments that were parsed from the
+      source feeds, already joined together with newlines.
+    format: 'rss' or 'atom'.
+  """
+  topic = db.TextProperty()
+  header_footer = db.TextProperty()
+  entries = db.TextProperty()
+  format = db.TextProperty()
+
+
+VIRTUAL_FEED_QUEUE = fork_join_queue.MemcacheForkJoinQueue(
+    FeedFragment,
+    None,
+    '/work/virtual_feeds',
+    VIRTUAL_FEED_QUEUE,
+    batch_size=20,
+    batch_period_ms=1000,
+    lock_timeout_ms=1000,
+    sync_timeout_ms=250,
+    stall_timeout_ms=30000,
+    acquire_timeout_ms=10,
+    acquire_attempts=50,
+    shard_count=1,
+    expiration_seconds=60)  # Give up on fragments after 60 seconds.
+
+
+def inject_virtual_feed(topic, format, header_footer, entries_map):
+  """Injects a virtual feed update to be collated and then delievered.
+
+  Args:
+    topic: The topic URL for the virtual feed.
+    format: The format of the virtual feed ('rss' or 'atom').
+    header_footer: The feed envelope to use for the whole virtual feed.
+    entries_map: Dictionary mapping feed entry IDs to strings containing
+ full entry payloads (e.g., from <entry> to </entry> including the tags).
+
+  Raises:
+    MemcacheError if the virtual feed could not be injected.
+  """
+  fragment = FeedFragment(
+      key=db.Key.from_path(FeedFragment.kind(), 'unused'),
+      topic=topic,
+      header_footer=header_footer,
+      entries='\n'.join(entries_map.values()),
+      format=format)
+
+  # Update the name of the queue to include a hash of the topic URL. This
+  # allows us to use a single VIRTUAL_FEED_QUEUE instance to represent a
+  # different logical queue for each virtual feed topic we would like to
+  # collate.
+  VIRTUAL_FEED_QUEUE.name = 'fjq-%s-%s-' % (
+      FeedFragment.kind(), sha1_hash(topic))
+  work_index = VIRTUAL_FEED_QUEUE.next_index()
+  try:
+    VIRTUAL_FEED_QUEUE.put(work_index, [fragment])
+  finally:
+    VIRTUAL_FEED_QUEUE.add(work_index)
+
+
+class CollateFeedHandler(webapp.RequestHandler):
+ """Worker handler that collates virtual feed updates and enqueues them."""
+
+  @work_queue_only
+  def post(self):
+    # Restore the pseudo-name of the queue so we can properly pop tasks
+    # from it for the specific virtual feed this task is targetting.
+    task_name = self.request.headers['X-AppEngine-TaskName']
+    VIRTUAL_FEED_QUEUE.name, rest = task_name.split('--')
+    VIRTUAL_FEED_QUEUE.name += '-'
+
+    fragment_list = VIRTUAL_FEED_QUEUE.pop_request(self.request)
+    if not fragment_list:
+      logging.warning('Pop of virtual feed task %r found no fragments.',
+                      task_name)
+      return
+
+    fragment = fragment_list[0]
+    entry_payloads = [f.entries for f in fragment_list]
+
+    def txn():
+      event_to_deliver = EventToDeliver.create_event_for_topic(
+          fragment.topic, fragment.format, fragment.header_footer,
+          entry_payloads, set_parent=False, max_failures=1)
+      db.put(event_to_deliver)
+      event_to_deliver.enqueue()
+
+    db.run_in_transaction(txn)
+    logging.debug('Injected %d fragments for virtual topic %r',
+                  len(fragment_list), fragment.topic)
+
+
+class VirtualFeedHook(Hook):
+  """Adds the CollateFeedHandler to the list of request handlers."""
+
+  def inspect(self, args, kwargs):
+    args[0].append((r'/work/virtual_feeds', CollateFeedHandler))
+    return False
+
+
+if 'register' in globals():
+  register(modify_handlers, VirtualFeedHook())
=======================================
--- /dev/null
+++ /trunk/nonstandard/virtual_feed_test.py     Sun Jul 11 22:24:18 2010
@@ -0,0 +1,216 @@
+#!/usr/bin/env python
+#
+# Copyright 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for the virtual_feed module."""
+
+import logging
+logging.basicConfig(format='%(levelname)-8s %(filename)s] %(message)s')
+import os
+import sys
+import unittest
+
+# Run these tests from the 'hub' directory.
+sys.path.insert(0, os.getcwd())
+os.chdir('../hub')
+sys.path.insert(0, os.getcwd())
+
+import testutil
+testutil.fix_path()
+
+from google.appengine.ext import webapp
+
+import main
+import virtual_feed
+
+################################################################################
+
+# Do aliasing that would happen anyways during Hook module loading.=
+virtual_feed.sha1_hash = main.sha1_hash
+virtual_feed.EventToDeliver = main.EventToDeliver
+
+
+class InjectVirtualFeedTest(testutil.HandlerTestBase):
+  """Tests for the inject_virtual_feed function."""
+
+  def setUp(self):
+    """Sets up the test harness."""
+    testutil.setup_for_testing()
+    self.topic = 'http://example.com/my-topic/1'
+    self.topic2 = 'http://example.com/my-topic/2'
+    self.format = 'atom'
+    self.header_footer = '<feed><id>tag:my-id</id>\n</feed>'
+    self.entries_map = {
+      'one': '<entry>first data</entry>',
+      'two': '<entry>second data</entry>',
+      'three': '<entry>third data</entry>',
+    }
+    os.environ['CURRENT_VERSION_ID'] = 'my-version.1234'
+    virtual_feed.VIRTUAL_FEED_QUEUE.queue_name = 'default'
+
+  def testInsertOneFragment(self):
+    """Tests inserting one new fragment."""
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    task = testutil.get_tasks('default', index=0, expected_count=1)
+    self.assertTrue(task['name'].startswith(
+        'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41--'
+        'my-version-'))
+    results = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task['name'])
+    self.assertEquals(1, len(results))
+    fragment = results[0]
+    self.assertEquals(self.topic, fragment.topic)
+    self.assertEquals(self.header_footer, fragment.header_footer)
+    self.assertEquals(self.format, fragment.format)
+    self.assertEquals(
+        '<entry>third data</entry>\n'  # Hash order
+        '<entry>second data</entry>\n'
+        '<entry>first data</entry>',
+        fragment.entries)
+
+  def testInsertMultipleFragments(self):
+    """Tests inserting multiple fragments on different virtual topics."""
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    virtual_feed.inject_virtual_feed(
+        self.topic2, self.format, self.header_footer, self.entries_map)
+
+    task1, task2 = testutil.get_tasks('default', expected_count=2)
+    self.assertTrue(task1['name'].startswith(
+        'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41--'
+        'my-version-'))
+    self.assertTrue(task2['name'].startswith(
+        'fjq-FeedFragment-0449375bf584a7a5d3a09b344a726dead30c3927--'
+        'my-version-'))
+
+    virtual_feed.VIRTUAL_FEED_QUEUE.name = \
+        'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f015e6830d41-'
+    fragment1 = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task1['name'])[0]
+    self.assertEquals(self.topic, fragment1.topic)
+
+    virtual_feed.VIRTUAL_FEED_QUEUE.name = \
+        'fjq-FeedFragment-0449375bf584a7a5d3a09b344a726dead30c3927-'
+    fragment2 = virtual_feed.VIRTUAL_FEED_QUEUE.pop(task2['name'])[0]
+    self.assertEquals(self.topic2, fragment2.topic)
+
+
+class CollateFeedHandlerTest(testutil.HandlerTestBase):
+  """Tests for the CollateFeedHandler class."""
+
+  handler_class = virtual_feed.CollateFeedHandler
+
+  def setUp(self):
+    """Sets up the test harness."""
+    testutil.HandlerTestBase.setUp(self)
+    self.topic = 'http://example.com/my-topic/1'
+    self.topic2 = 'http://example.com/my-topic/2'
+    self.format = 'atom'
+    self.header_footer = '<feed><id>tag:my-id</id>\n</feed>'
+    self.entries_map = {
+      'one': '<entry>first data</entry>',
+      'two': '<entry>second data</entry>',
+      'three': '<entry>third data</entry>',
+    }
+    os.environ['CURRENT_VERSION_ID'] = 'my-version.1234'
+    virtual_feed.VIRTUAL_FEED_QUEUE.queue_name = 'default'
+
+  def testNoWork(self):
+    """Tests when the queue is empty."""
+    os.environ['HTTP_X_APPENGINE_TASKNAME'] = (
+      'fjq-FeedFragment-54124f41c1ea6e67e4beacac85b9f01abb6830d41--'
+      'my-version-42630240-2654435761-0')
+    self.handle('post')
+
+  def testOneFragment(self):
+    """Tests when there is one fragment in the queue."""
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    task = testutil.get_tasks('default', index=0, expected_count=1)
+    os.environ['HTTP_X_APPENGINE_TASKNAME'] = task['name']
+    self.handle('post')
+
+    event_list = list(main.EventToDeliver.all())
+    self.assertEquals(1, len(event_list))
+    event = event_list[0]
+
+    # No parent to ensure it's not rate limited by an entity group.
+    self.assertEquals(None, event.key().parent())
+
+    self.assertEquals(self.topic, event.topic)
+    self.assertEquals(
+      '<?xml version="1.0" encoding="utf-8"?>\n'
+      '<feed><id>tag:my-id</id>\n\n'
+      '<entry>third data</entry>\n'
+      '<entry>second data</entry>\n'
+      '<entry>first data</entry>\n'
+      '</feed>',
+      event.payload)
+    self.assertEquals('application/atom+xml', event.content_type)
+    self.assertEquals(1, event.max_failures)
+
+    task = testutil.get_tasks('event-delivery', index=0, expected_count=1)
+    self.assertEquals(str(event.key()), task['params']['event_key'])
+
+  def testMultipleFragments(self):
+    """Tests when there is more than one fragment in the queue."""
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    task = testutil.get_tasks('default', index=0, expected_count=1)
+    os.environ['HTTP_X_APPENGINE_TASKNAME'] = task['name']
+    self.handle('post')
+
+    event_list = list(main.EventToDeliver.all())
+    self.assertEquals(1, len(event_list))
+    event = event_list[0]
+
+    self.assertEquals(self.topic, event.topic)
+    self.assertEquals(
+      '<?xml version="1.0" encoding="utf-8"?>\n'
+      '<feed><id>tag:my-id</id>\n\n'
+      '<entry>third data</entry>\n'
+      '<entry>second data</entry>\n'
+      '<entry>first data</entry>\n'
+      '<entry>third data</entry>\n'
+      '<entry>second data</entry>\n'
+      '<entry>first data</entry>\n'
+      '</feed>',
+      event.payload)
+
+  def testMultipleQueues(self):
+    """Tests multiple virtual feeds and queues."""
+    virtual_feed.inject_virtual_feed(
+        self.topic, self.format, self.header_footer, self.entries_map)
+    virtual_feed.inject_virtual_feed(
+        self.topic2, self.format, self.header_footer, self.entries_map)
+    task1, task2 = testutil.get_tasks('default', expected_count=2)
+
+    os.environ['HTTP_X_APPENGINE_TASKNAME'] = task1['name']
+    self.handle('post')
+
+    os.environ['HTTP_X_APPENGINE_TASKNAME'] = task2['name']
+    self.handle('post')
+
+    event_list = list(main.EventToDeliver.all())
+    self.assertEquals(2, len(event_list))
+    self.assertEquals(self.topic, event_list[0].topic)
+    self.assertEquals(self.topic2, event_list[1].topic)
+
+################################################################################
+
+if __name__ == '__main__':
+  unittest.main()
=======================================
--- /trunk/hub/feed_diff_test.py        Thu Jun  3 15:45:54 2010
+++ /trunk/hub/feed_diff_test.py        Sun Jul 11 22:24:18 2010
@@ -303,6 +303,9 @@
       self.load_feed('xhtml_entities.xml')
       self.fail('Should have raised an exception')
     except feed_diff.Error, e:
+      # TODO(bslatkin): Fix this datafile in head.
+      # This ensures that hte failure is because of bad entities, not a
+      # missing test data file.
       self.assertFalse('IOError' in str(e))


=======================================
--- /trunk/hub/fork_join_queue.py       Sun Jun 27 11:46:09 2010
+++ /trunk/hub/fork_join_queue.py       Sun Jul 11 22:24:18 2010
@@ -180,16 +180,27 @@
     self.stall_timeout = stall_timeout_ms / 1000.0
     self.acquire_timeout = acquire_timeout_ms / 1000.0
     self.acquire_attempts = acquire_attempts
-
-    self.lock_name = self.name + '-lock'
-    self.add_counter_template = self.name + '-add-lock:%d'
-    self.index_name = self.name + '-index'
self.batch_delta = datetime.timedelta(microseconds=batch_period_ms * 1000)

   def get_queue_name(self, index):
"""Returns the name of the queue to use based on the given work index."""
     return self.queue_name

+  @property
+  def lock_name(self):
+    """Returns the lock key prefix for the current prefix name."""
+    return self.name + '-lock'
+
+  @property
+  def add_counter_template(self):
+ """Returns the add counter prefix template for the current prefix name."""
+    return self.name + '-add-lock:%d'
+
+  @property
+  def index_name(self):
+    """Returns the index key prefix for the current prefix name."""
+    return self.name + '-index'
+
   def next_index(self,
                  memget=memcache.get,
                  memincr=memcache.incr,
@@ -226,6 +237,10 @@
         return next_index
       time.sleep(self.acquire_timeout)
     else:
+ # Force the index forward; here we're stuck in a loop where the memcache + # index was evicted and all new lock acqusitions are reusing old locks
+      # that were already closed off to new writers.
+      memincr(self.index_name)
       raise WriterLockError('Task adder could not increment writer lock.')

   def add(self, index, gettime=time.time):
@@ -235,23 +250,37 @@
# memcache is evicted. This prevents new task names from overlapping with
     # old ones.
     nearest_gap = int(now_stamp / self.stall_timeout)
+    # Include major version in the task name to ensure that test tasks
+    # enqueued from a non-default major version will run in the new context
+    # instead of the default major version.
+ major_version, minor_version = os.environ['CURRENT_VERSION_ID'].split('.')
+    task_name = '%s-%s-%d-%d-%d' % (
+        self.name, major_version, nearest_gap, index, 0)
+
+    # When the batch_delta is zero, then there should be no ETA, the task
+    # should run immediately and the reader will busy wait for all writers.
+    if self.batch_delta == 0:
+      eta = None
+    else:
+      eta = datetime_from_stamp(now_stamp) + self.batch_delta
+
     try:
       taskqueue.Task(
         method='POST',
-        name='%s-%d-%d-%d' % (self.name, nearest_gap, index, 0),
+        name=task_name,
         url=self.task_path,
-        eta=datetime_from_stamp(now_stamp) + self.batch_delta
+        eta=eta
       ).add(self.get_queue_name(index))
     except taskqueue.TaskAlreadyExistsError:
# This is okay. It means the task has already been inserted by another # add() call for this same batch. We're holding the lock at this point
       # so we know that job won't start yet.
       pass
-    except taskqueue.TombstonedTaskError:
+    except taskqueue.TombstonedTaskError, e:
# This is bad. This means 1) the lock we held expired and the task already # ran, 2) this task name somehow overlaps with an old task. Return the
       # error to the caller so they can try again.
-      raise TaskConflictError('Task named "%s" tombstoned' % task.name)
+      raise TaskConflictError('Task named tombstoned: %s' % e)
     finally:
       # Don't bother checking the decr status; worst-case the worker job
       # will time out after some number of seconds and proceed anyways.
@@ -295,8 +324,7 @@
     return False

   def _query_work(self, index, cursor):
-    """TODO
-    """
+    """Queries for work in the Datastore."""
     query = (self.model_class.all()
         .filter('%s =' % self.index_property.name, index)
         .order('__key__'))
@@ -314,6 +342,7 @@
     Returns:
       A list of work items, if any.
     """
+    # TODO: Use request.headers['X-AppEngine-TaskName'] instead of environ.
     return self.pop(os.environ['HTTP_X_APPENGINE_TASKNAME'],
                     request.get('cursor'))

@@ -338,17 +367,22 @@
     result_list, cursor = self._query_work(index, cursor)

     if len(result_list) == self.batch_size:
-      try:
-        taskqueue.Task(
-          method='POST',
-          name='%s-%d-%d' % (rest, index, generation + 1),
-          url=self.task_path,
-          params={'cursor': cursor}
-        ).add(self.get_queue_name(index))
- except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
-        # This means the continuation chain already started and this root
-        # task failed for some reason; no problem.
-        pass
+      for i in xrange(3):
+        try:
+          taskqueue.Task(
+            method='POST',
+            name='%s-%d-%d' % (rest, index, generation + 1),
+            url=self.task_path,
+            params={'cursor': cursor}
+          ).add(self.get_queue_name(index))
+ except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
+          # This means the continuation chain already started and this root
+          # task failed for some reason; no problem.
+          break
+        except (taskqueue.TransientError, taskqueue.InternalError):
+          # Ignore transient taskqueue errors.
+          if i == 2:
+            raise

     return result_list

=======================================
--- /trunk/hub/fork_join_queue_test.py  Sun Jun 27 11:46:09 2010
+++ /trunk/hub/fork_join_queue_test.py  Sun Jul 11 22:24:18 2010
@@ -48,7 +48,7 @@
     '/path/to/my/task',
     'default',
     batch_size=3,
-    batch_period_ms=200,
+    batch_period_ms=2200,
     lock_timeout_ms=1000,
     sync_timeout_ms=250,
     stall_timeout_ms=30000,
@@ -56,13 +56,27 @@
     acquire_attempts=20)


+TEST_QUEUE_ZERO_BATCH_TIME = fork_join_queue.ForkJoinQueue(
+    TestModel,
+    TestModel.work_index,
+    '/path/to/my/task',
+    'default',
+    batch_size=3,
+    batch_period_ms=0,
+    lock_timeout_ms=1000,
+    sync_timeout_ms=250,
+    stall_timeout_ms=30000,
+    acquire_timeout_ms=50,
+    acquire_attempts=20)
+
+
 SHARDED_QUEUE = fork_join_queue.ShardedForkJoinQueue(
     TestModel,
     TestModel.work_index,
     '/path/to/my/task',
     'default-%(shard)s',
     batch_size=3,
-    batch_period_ms=200,
+    batch_period_ms=2200,
     lock_timeout_ms=1000,
     sync_timeout_ms=250,
     stall_timeout_ms=30000,
@@ -77,7 +91,7 @@
     '/path/to/my/task',
     'default',
     batch_size=3,
-    batch_period_ms=200,
+    batch_period_ms=2200,
     lock_timeout_ms=1000,
     sync_timeout_ms=250,
     stall_timeout_ms=30000,
@@ -96,17 +110,26 @@
     self.now2 = 1274078097.79072
     self.gettime1 = lambda: self.now1
     self.gettime2 = lambda: self.now2
+    os.environ['CURRENT_VERSION_ID'] = 'myversion.1234'
     if 'HTTP_X_APPENGINE_TASKNAME' in os.environ:
       del os.environ['HTTP_X_APPENGINE_TASKNAME']

-  def expect_task(self, index, generation=0, now_time=None, cursor=None):
+  def expect_task(self,
+                  index,
+                  generation=0,
+                  now_time=None,
+                  cursor=None,
+                  batch_period_ms=2200000):
     """Creates an expected task dictionary."""
     if now_time is None:
       now_time = self.now1
     gap_number = int(now_time / 30.0)
+    import math
     work_item = {
-      'name': 'fjq-TestModel-%d-%d-%d' % (gap_number, index, generation),
-      'eta': int(10**6 * now_time) + 500000,
+      'name': 'fjq-TestModel-myversion-%d-%d-%d' % (
+          gap_number, index, generation),
+      # Working around weird rounding behavior of task queue stub.
+      'eta': (10**6 * now_time) + batch_period_ms,
     }
     if cursor is not None:
       work_item['cursor'] = cursor
@@ -117,7 +140,10 @@
     found_tasks.sort(key=lambda t: t['eta'])
     for expected, found in zip(expected_tasks, found_tasks):
       self.assertEquals(expected['name'], found['name'])
- self.assertEquals(int(expected['eta'] / 10**6), int(found['eta'] / 10**6))
+      # Round these task ETAs to integers because the taskqueue stub does
+      # not support floating-point ETAs.
+      self.assertEquals(round(expected['eta'] / 10**6),
+                        round(found['eta'] / 10**6))
       self.assertEquals(expected.get('cursor'),
                         found.get('params', {}).get('cursor'))
       self.assertEquals('POST', found['method'])
@@ -223,10 +249,18 @@

   def testNextIndexBusyWaitFail(self):
     """Tests when busy waiting for a new index fails."""
+    seen_keys = []
+    def fake_incr(key, *args, **kwargs):
+      seen_keys.append(key)
+      return 100
     self.assertRaises(
         fork_join_queue.WriterLockError,
         TEST_QUEUE.next_index,
-        memincr=lambda *a, **k: 100)
+        memincr=fake_incr)
+    self.assertEquals(
+        (['fjq-TestModel-add-lock:2654435761'] * 20) +
+        ['fjq-TestModel-index'],
+        seen_keys)

   def testPopOne(self):
     """Tests popping a single entity from the queue."""
@@ -336,6 +370,16 @@
     work_index = TEST_QUEUE.next_index()
     self.assertFalse(TEST_QUEUE._increment_index(work_index))

+  def testZeroBatchTime(self):
+    """Tests that zero batch time results in no task ETA."""
+    work_index = TEST_QUEUE_ZERO_BATCH_TIME.next_index()
+    task = TestModel(work_index=work_index, number=1)
+    db.put(task)
+    TEST_QUEUE_ZERO_BATCH_TIME.add(work_index, gettime=self.gettime1)
+    self.assertTasksEqual(
+      [self.expect_task(work_index, batch_period_ms=0)],
+      testutil.get_tasks('default', usec_eta=True))
+
   def testShardedQueue(self):
     """Tests adding and popping from a sharded queue with continuation."""
     from google.appengine.api import apiproxy_stub_map
=======================================
--- /trunk/hub/main.py  Sun Jun 27 12:57:44 2010
+++ /trunk/hub/main.py  Sun Jul 11 22:24:18 2010
@@ -1117,6 +1117,8 @@
       if memory_only:
         cls.FORK_JOIN_QUEUE.put(work_index, feed_list)
       else:
+        # TODO(bslatkin): Insert fetching tasks here to fix the polling
+        # mode for this codebase.
         db.put(feed_list)
     finally:
       if memory_only:
@@ -1424,14 +1426,21 @@
   last_callback = db.TextProperty(default='')  # For paging Subscriptions
failed_callbacks = db.ListProperty(db.Key) # Refs to Subscription entities
   delivery_mode = db.StringProperty(default=NORMAL, choices=DELIVERY_MODES)
-  retry_attempts = db.IntegerProperty(default=0)
+  retry_attempts = db.IntegerProperty(default=0, indexed=False)
   last_modified = db.DateTimeProperty(required=True)
   totally_failed = db.BooleanProperty(default=False)
   content_type = db.TextProperty(default='')
+  max_failures = db.IntegerProperty(indexed=False, indexed=False)

   @classmethod
- def create_event_for_topic(cls, topic, format, header_footer, entry_payloads,
-                             now=datetime.datetime.utcnow):
+  def create_event_for_topic(cls,
+                             topic,
+                             format,
+                             header_footer,
+                             entry_payloads,
+                             now=datetime.datetime.utcnow,
+                             set_parent=True,
+                             max_failures=None):
"""Creates an event to deliver for a topic and set of published entries.

     Args:
@@ -1443,6 +1452,12 @@
XML data for each entry, including surrounding tags) in order of newest
         to oldest.
       now: Returns the current time as a UTC datetime. Used in tests.
+ set_parent: Set the parent to the FeedRecord for the given topic. This is + necessary for the parse_feed flow's transaction. Default is True. Set
+        to False if this EventToDeliver will be written outside of the
+        FeedRecord transaction.
+ max_failures: Maximum number of failures to allow for this event. When
+        None (the default) it will use the MAX_DELIVERY_FAILURES constant.

     Returns:
       A new EventToDeliver instance that has not been stored.
@@ -1469,14 +1484,20 @@
     payload_list.append(header_footer[close_index:])
     payload = '\n'.join(payload_list)

+    if set_parent:
+      parent = db.Key.from_path(
+          FeedRecord.kind(), FeedRecord.create_key_name(topic))
+    else:
+      parent = None
+
     return cls(
-        parent=db.Key.from_path(
-            FeedRecord.kind(), FeedRecord.create_key_name(topic)),
+        parent=parent,
         topic=topic,
         topic_hash=sha1_hash(topic),
         payload=payload,
         last_modified=now(),
-        content_type=content_type)
+        content_type=content_type,
+        max_failures=max_failures)

   def get_next_subscribers(self, chunk_size=None):
"""Retrieve the next set of subscribers to attempt delivery for this event.
@@ -1577,6 +1598,8 @@
       retry_delay = retry_period * (2 ** self.retry_attempts)
       self.last_modified += datetime.timedelta(seconds=retry_delay)
       self.retry_attempts += 1
+      if self.max_failures is not None:
+        max_failures = self.max_failures
       if self.retry_attempts > max_failures:
         self.totally_failed = True

=======================================
--- /trunk/hub/main_test.py     Sun Jun 27 11:46:09 2010
+++ /trunk/hub/main_test.py     Sun Jul 11 22:24:18 2010
@@ -1135,6 +1135,30 @@
     testutil.get_tasks(main.EVENT_QUEUE, expected_count=1)
     testutil.get_tasks(main.POLLING_QUEUE, expected_count=1)

+  def testMaxFailuresOverride(self):
+    """Tests the max_failures override value."""
+    event = EventToDeliver.create_event_for_topic(
+        self.topic, main.ATOM, self.header_footer, self.test_payloads)
+    self.assertEquals(None, event.max_failures)
+
+    event = EventToDeliver.create_event_for_topic(
+        self.topic, main.ATOM, self.header_footer, self.test_payloads,
+        max_failures=1)
+    self.assertEquals(1, event.max_failures)
+
+    Subscription.insert(
+        self.callback, self.topic, self.token, self.secret)
+    subscription_list = list(Subscription.all())
+
+    event.put()
+    event.update(False, subscription_list)
+    event2 = db.get(event.key())
+    self.assertFalse(event2.totally_failed)
+
+    event2.update(False, [])
+    event3 = db.get(event.key())
+    self.assertTrue(event3.totally_failed)
+
################################################################################

 class PublishHandlerTest(testutil.HandlerTestBase):
=======================================
--- /trunk/hub/testutil.py      Mon May 17 01:57:39 2010
+++ /trunk/hub/testutil.py      Sun Jul 11 22:24:18 2010
@@ -29,10 +29,12 @@


 TEST_APP_ID = 'my-app-id'
+TEST_VERSION_ID = 'my-version.1234'

 # Assign the application ID up front here so we can create db.Key instances
 # before doing any other test setup.
 os.environ['APPLICATION_ID'] = TEST_APP_ID
+os.environ['CURRENT_VERSION_ID'] = TEST_VERSION_ID


 def fix_path():

Reply via email to