Revision: 368
Author: bslatkin
Date: Sat Jun 5 18:56:52 2010
Log: hub: fix race condition on asynchronous RPC callbacks
http://code.google.com/p/pubsubhubbub/source/detail?r=368
Modified:
/trunk/hub/async_apiproxy.py
=======================================
--- /trunk/hub/async_apiproxy.py Thu Jan 21 19:42:07 2010
+++ /trunk/hub/async_apiproxy.py Sat Jun 5 18:56:52 2010
@@ -58,6 +58,7 @@
# asynchronous queue and better catch any funny race-conditions or
# unclear event ordering dependencies.
self.enqueued = collections.deque()
+ self.complete = collections.deque()
def start_call(self, package, call, pbrequest, pbresponse, user_callback,
deadline=None):
@@ -65,10 +66,11 @@
if not callable(user_callback):
raise TypeError('%r not callable' % user_callback)
+ done_callback = lambda: user_callback(pbresponse, None)
rpc = AsyncRPC(package, call, pbrequest, pbresponse,
- lambda: user_callback(pbresponse, None),
+ lambda: self.end_call(done_callback),
deadline=deadline)
- setattr(rpc, 'user_callback', user_callback) # TODO make this pretty
+ setattr(rpc, 'user_callback', user_callback)
self.enqueued.append(rpc)
show_request = '...'
if rpc.package == 'urlfetch':
@@ -77,15 +79,18 @@
rpc.package, rpc.call, show_request)
rpc.MakeCall()
+ def end_call(self, rpc):
+ """An outstanding RPC has completed, enqueue its callback for
execution."""
+ self.complete.append(rpc)
+
def rpcs_outstanding(self):
"""Returns the number of asynchronous RPCs pending in this proxy."""
return len(self.enqueued)
- def wait_one(self):
- """Wait for a single RPC to finish. Returns True if one was
processed."""
+ def _wait_one(self):
+ """Wait for a single RPC to finish."""
if not self.enqueued:
- return False
-
+ return
rpc = self.enqueued.popleft()
logging.debug('Waiting for RPC(%s, %s, .., ..)', rpc.package, rpc.call)
rpc.Wait()
@@ -93,12 +98,32 @@
rpc.CheckSuccess()
except (apiproxy_errors.Error, apiproxy_errors.ApplicationError), e:
rpc.user_callback(None, e)
- return True
+
+ def _run_callbacks(self):
+ """Runs a single RPC's success callback.
+
+ Callbacks are run in a loop like this from the wait() callstack to
avoid
+ race conditions from the APIProxy. Any API call can cause asynchronous
+ callbacks to fire before the main thread goes to sleep, which means a
+ user callback could run *before* a Commit() call finishes; this causes
+ really bad situations when the user callback also does some API calls.
To
+ handle this properly, all callbacks will just go onto the completion
+ queue, and then run at the top of the stack here after at least one RPC
+ waiting period has finished.
+ """
+ while True:
+ try:
+ callback = self.complete.popleft()
+ except IndexError:
+ return
+ else:
+ callback()
def wait(self):
- """Wait for RPCs to finish. Returns True if any were processed."""
+ """Wait for RPCs to finish. Returns True if any were processed."""
while self.enqueued:
- self.wait_one()
+ self._wait_one()
+ self._run_callbacks()
else:
return False
return True