Revision: 376
Author: bslatkin
Date: Sun Jul 11 22:57:08 2010
Log: hub: more forkjoin tweaks, helpful comments
http://code.google.com/p/pubsubhubbub/source/detail?r=376

Modified:
 /trunk/hub/async_apiproxy.py
 /trunk/hub/fork_join_queue.py
 /trunk/hub/fork_join_queue_test.py
 /trunk/hub/main.py

=======================================
--- /trunk/hub/async_apiproxy.py        Sat Jun  5 18:56:52 2010
+++ /trunk/hub/async_apiproxy.py        Sun Jul 11 22:57:08 2010
@@ -66,11 +66,16 @@
     if not callable(user_callback):
       raise TypeError('%r not callable' % user_callback)

+ # Do not actually supply the callback to the async call function because
+    # when it runs it could interfere with global state (like Datastore
+    # transactions). The callback will be run from the wait_one() function.
     done_callback = lambda: user_callback(pbresponse, None)
     rpc = AsyncRPC(package, call, pbrequest, pbresponse,
                    lambda: self.end_call(done_callback),
                    deadline=deadline)
     setattr(rpc, 'user_callback', user_callback)
+    setattr(rpc, 'pbresponse', user_callback)
+
     self.enqueued.append(rpc)
     show_request = '...'
     if rpc.package == 'urlfetch':
@@ -122,8 +127,10 @@
   def wait(self):
     """Wait for RPCs to finish. Returns True if any were processed."""
     while self.enqueued:
+ # Run the callbacks before even waiting, because a response could have
+      # come back during any outbound API call.
+      self._run_callbacks()
       self._wait_one()
       self._run_callbacks()
-    else:
-      return False
-    return True
+    # Run them one last time after waiting to pick up the final callback!
+    self._run_callbacks()
=======================================
--- /trunk/hub/fork_join_queue.py       Sun Jul 11 22:24:18 2010
+++ /trunk/hub/fork_join_queue.py       Sun Jul 11 22:57:08 2010
@@ -180,7 +180,10 @@
     self.stall_timeout = stall_timeout_ms / 1000.0
     self.acquire_timeout = acquire_timeout_ms / 1000.0
     self.acquire_attempts = acquire_attempts
- self.batch_delta = datetime.timedelta(microseconds=batch_period_ms * 1000)
+    if batch_period_ms == 0:
+      self.batch_delta = None
+    else:
+ self.batch_delta = datetime.timedelta(microseconds=batch_period_ms * 1000)

   def get_queue_name(self, index):
"""Returns the name of the queue to use based on the given work index."""
@@ -257,9 +260,9 @@
     task_name = '%s-%s-%d-%d-%d' % (
         self.name, major_version, nearest_gap, index, 0)

-    # When the batch_delta is zero, then there should be no ETA, the task
+ # When the batch_period_ms is zero, then there should be no ETA, the task
     # should run immediately and the reader will busy wait for all writers.
-    if self.batch_delta == 0:
+    if self.batch_delta is None:
       eta = None
     else:
       eta = datetime_from_stamp(now_stamp) + self.batch_delta
@@ -271,6 +274,12 @@
         url=self.task_path,
         eta=eta
       ).add(self.get_queue_name(index))
+      if self.batch_delta is None:
+        # When the batch_period_ms is zero, we want to immediately move the
+        # index to the next position as soon as the current batch finishes
+ # writing its task. This will only run for the first successful task
+        # inserter.
+        memcache.incr(self.index_name)
     except taskqueue.TaskAlreadyExistsError:
# This is okay. It means the task has already been inserted by another # add() call for this same batch. We're holding the lock at this point
@@ -301,6 +310,9 @@
     # do this when it notices no current index is present. Do this *before*
# closing the reader/writer lock below to decrease active writers on the
     # current index.
+ # We do this even in the case that batch_period_ms was zero, just in case
+    # that memcache operation failed for some reason, we'd rather have more
+    # batches then have the work index pipeline stall.
     memcache.incr(self.index_name)

     # Prevent new writers by making the counter extremely negative. If the
@@ -375,6 +387,7 @@
             url=self.task_path,
             params={'cursor': cursor}
           ).add(self.get_queue_name(index))
+          break
except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
           # This means the continuation chain already started and this root
           # task failed for some reason; no problem.
@@ -471,7 +484,12 @@
   def _query_work(self, index, cursor):
     """Queries for work in memcache."""
     if cursor:
-      cursor = int(cursor)
+      try:
+        cursor = int(cursor)
+      except ValueError:
+        # This is an old style task that resides in the Datastore, not
+        # memcache. Use the parent implementation instead.
+ return super(MemcacheForkJoinQueue, self)._query_work(index, cursor)
     else:
       cursor = 0

=======================================
--- /trunk/hub/fork_join_queue_test.py  Sun Jul 11 22:24:18 2010
+++ /trunk/hub/fork_join_queue_test.py  Sun Jul 11 22:57:08 2010
@@ -135,15 +135,17 @@
       work_item['cursor'] = cursor
     return work_item

-  def assertTasksEqual(self, expected_tasks, found_tasks):
+  def assertTasksEqual(self, expected_tasks, found_tasks,
+                       check_eta=True):
     """Asserts two lists of tasks are equal."""
     found_tasks.sort(key=lambda t: t['eta'])
     for expected, found in zip(expected_tasks, found_tasks):
       self.assertEquals(expected['name'], found['name'])
-      # Round these task ETAs to integers because the taskqueue stub does
-      # not support floating-point ETAs.
-      self.assertEquals(round(expected['eta'] / 10**6),
-                        round(found['eta'] / 10**6))
+      if check_eta:
+        # Round these task ETAs to integers because the taskqueue stub does
+        # not support floating-point ETAs.
+        self.assertEquals(round(expected['eta'] / 10**6),
+                          round(found['eta'] / 10**6))
       self.assertEquals(expected.get('cursor'),
                         found.get('params', {}).get('cursor'))
       self.assertEquals('POST', found['method'])
@@ -375,10 +377,21 @@
     work_index = TEST_QUEUE_ZERO_BATCH_TIME.next_index()
     task = TestModel(work_index=work_index, number=1)
     db.put(task)
+
+    before_index = memcache.get(TEST_QUEUE_ZERO_BATCH_TIME.index_name)
+    self.assertEquals(
+        work_index, fork_join_queue.knuth_hash(before_index))
     TEST_QUEUE_ZERO_BATCH_TIME.add(work_index, gettime=self.gettime1)
+
+    # This confirms the behavior that batch_period_ms of zero will cause
+    # immediate increment after adding the tasks.
+    after_index = memcache.get(TEST_QUEUE_ZERO_BATCH_TIME.index_name)
+    self.assertEquals(before_index + 1, after_index)
+
     self.assertTasksEqual(
-      [self.expect_task(work_index, batch_period_ms=0)],
-      testutil.get_tasks('default', usec_eta=True))
+      [self.expect_task(work_index)],
+      testutil.get_tasks('default', usec_eta=True),
+      check_eta=False)

   def testShardedQueue(self):
     """Tests adding and popping from a sharded queue with continuation."""
=======================================
--- /trunk/hub/main.py  Sun Jul 11 22:24:18 2010
+++ /trunk/hub/main.py  Sun Jul 11 22:57:08 2010
@@ -1430,7 +1430,7 @@
   last_modified = db.DateTimeProperty(required=True)
   totally_failed = db.BooleanProperty(default=False)
   content_type = db.TextProperty(default='')
-  max_failures = db.IntegerProperty(indexed=False, indexed=False)
+  max_failures = db.IntegerProperty(indexed=False)

   @classmethod
   def create_event_for_topic(cls,

Reply via email to