Revision: 376
Author: bslatkin
Date: Sun Jul 11 22:57:08 2010
Log: hub: more forkjoin tweaks, helpful comments
http://code.google.com/p/pubsubhubbub/source/detail?r=376
Modified:
/trunk/hub/async_apiproxy.py
/trunk/hub/fork_join_queue.py
/trunk/hub/fork_join_queue_test.py
/trunk/hub/main.py
=======================================
--- /trunk/hub/async_apiproxy.py Sat Jun 5 18:56:52 2010
+++ /trunk/hub/async_apiproxy.py Sun Jul 11 22:57:08 2010
@@ -66,11 +66,16 @@
if not callable(user_callback):
raise TypeError('%r not callable' % user_callback)
+ # Do not actually supply the callback to the async call function
because
+ # when it runs it could interfere with global state (like Datastore
+ # transactions). The callback will be run from the wait_one() function.
done_callback = lambda: user_callback(pbresponse, None)
rpc = AsyncRPC(package, call, pbrequest, pbresponse,
lambda: self.end_call(done_callback),
deadline=deadline)
setattr(rpc, 'user_callback', user_callback)
+ setattr(rpc, 'pbresponse', user_callback)
+
self.enqueued.append(rpc)
show_request = '...'
if rpc.package == 'urlfetch':
@@ -122,8 +127,10 @@
def wait(self):
"""Wait for RPCs to finish. Returns True if any were processed."""
while self.enqueued:
+ # Run the callbacks before even waiting, because a response could
have
+ # come back during any outbound API call.
+ self._run_callbacks()
self._wait_one()
self._run_callbacks()
- else:
- return False
- return True
+ # Run them one last time after waiting to pick up the final callback!
+ self._run_callbacks()
=======================================
--- /trunk/hub/fork_join_queue.py Sun Jul 11 22:24:18 2010
+++ /trunk/hub/fork_join_queue.py Sun Jul 11 22:57:08 2010
@@ -180,7 +180,10 @@
self.stall_timeout = stall_timeout_ms / 1000.0
self.acquire_timeout = acquire_timeout_ms / 1000.0
self.acquire_attempts = acquire_attempts
- self.batch_delta = datetime.timedelta(microseconds=batch_period_ms *
1000)
+ if batch_period_ms == 0:
+ self.batch_delta = None
+ else:
+ self.batch_delta = datetime.timedelta(microseconds=batch_period_ms *
1000)
def get_queue_name(self, index):
"""Returns the name of the queue to use based on the given work
index."""
@@ -257,9 +260,9 @@
task_name = '%s-%s-%d-%d-%d' % (
self.name, major_version, nearest_gap, index, 0)
- # When the batch_delta is zero, then there should be no ETA, the task
+ # When the batch_period_ms is zero, then there should be no ETA, the
task
# should run immediately and the reader will busy wait for all writers.
- if self.batch_delta == 0:
+ if self.batch_delta is None:
eta = None
else:
eta = datetime_from_stamp(now_stamp) + self.batch_delta
@@ -271,6 +274,12 @@
url=self.task_path,
eta=eta
).add(self.get_queue_name(index))
+ if self.batch_delta is None:
+ # When the batch_period_ms is zero, we want to immediately move the
+ # index to the next position as soon as the current batch finishes
+ # writing its task. This will only run for the first successful
task
+ # inserter.
+ memcache.incr(self.index_name)
except taskqueue.TaskAlreadyExistsError:
# This is okay. It means the task has already been inserted by
another
# add() call for this same batch. We're holding the lock at this
point
@@ -301,6 +310,9 @@
# do this when it notices no current index is present. Do this *before*
# closing the reader/writer lock below to decrease active writers on
the
# current index.
+ # We do this even in the case that batch_period_ms was zero, just in
case
+ # that memcache operation failed for some reason, we'd rather have more
+ # batches then have the work index pipeline stall.
memcache.incr(self.index_name)
# Prevent new writers by making the counter extremely negative. If the
@@ -375,6 +387,7 @@
url=self.task_path,
params={'cursor': cursor}
).add(self.get_queue_name(index))
+ break
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
# This means the continuation chain already started and this root
# task failed for some reason; no problem.
@@ -471,7 +484,12 @@
def _query_work(self, index, cursor):
"""Queries for work in memcache."""
if cursor:
- cursor = int(cursor)
+ try:
+ cursor = int(cursor)
+ except ValueError:
+ # This is an old style task that resides in the Datastore, not
+ # memcache. Use the parent implementation instead.
+ return super(MemcacheForkJoinQueue, self)._query_work(index,
cursor)
else:
cursor = 0
=======================================
--- /trunk/hub/fork_join_queue_test.py Sun Jul 11 22:24:18 2010
+++ /trunk/hub/fork_join_queue_test.py Sun Jul 11 22:57:08 2010
@@ -135,15 +135,17 @@
work_item['cursor'] = cursor
return work_item
- def assertTasksEqual(self, expected_tasks, found_tasks):
+ def assertTasksEqual(self, expected_tasks, found_tasks,
+ check_eta=True):
"""Asserts two lists of tasks are equal."""
found_tasks.sort(key=lambda t: t['eta'])
for expected, found in zip(expected_tasks, found_tasks):
self.assertEquals(expected['name'], found['name'])
- # Round these task ETAs to integers because the taskqueue stub does
- # not support floating-point ETAs.
- self.assertEquals(round(expected['eta'] / 10**6),
- round(found['eta'] / 10**6))
+ if check_eta:
+ # Round these task ETAs to integers because the taskqueue stub does
+ # not support floating-point ETAs.
+ self.assertEquals(round(expected['eta'] / 10**6),
+ round(found['eta'] / 10**6))
self.assertEquals(expected.get('cursor'),
found.get('params', {}).get('cursor'))
self.assertEquals('POST', found['method'])
@@ -375,10 +377,21 @@
work_index = TEST_QUEUE_ZERO_BATCH_TIME.next_index()
task = TestModel(work_index=work_index, number=1)
db.put(task)
+
+ before_index = memcache.get(TEST_QUEUE_ZERO_BATCH_TIME.index_name)
+ self.assertEquals(
+ work_index, fork_join_queue.knuth_hash(before_index))
TEST_QUEUE_ZERO_BATCH_TIME.add(work_index, gettime=self.gettime1)
+
+ # This confirms the behavior that batch_period_ms of zero will cause
+ # immediate increment after adding the tasks.
+ after_index = memcache.get(TEST_QUEUE_ZERO_BATCH_TIME.index_name)
+ self.assertEquals(before_index + 1, after_index)
+
self.assertTasksEqual(
- [self.expect_task(work_index, batch_period_ms=0)],
- testutil.get_tasks('default', usec_eta=True))
+ [self.expect_task(work_index)],
+ testutil.get_tasks('default', usec_eta=True),
+ check_eta=False)
def testShardedQueue(self):
"""Tests adding and popping from a sharded queue with continuation."""
=======================================
--- /trunk/hub/main.py Sun Jul 11 22:24:18 2010
+++ /trunk/hub/main.py Sun Jul 11 22:57:08 2010
@@ -1430,7 +1430,7 @@
last_modified = db.DateTimeProperty(required=True)
totally_failed = db.BooleanProperty(default=False)
content_type = db.TextProperty(default='')
- max_failures = db.IntegerProperty(indexed=False, indexed=False)
+ max_failures = db.IntegerProperty(indexed=False)
@classmethod
def create_event_for_topic(cls,