Revision: 368
Author: bslatkin
Date: Sat Jun  5 18:56:52 2010
Log: hub: fix race condition on asynchronous RPC callbacks
http://code.google.com/p/pubsubhubbub/source/detail?r=368

Modified:
 /trunk/hub/async_apiproxy.py

=======================================
--- /trunk/hub/async_apiproxy.py        Thu Jan 21 19:42:07 2010
+++ /trunk/hub/async_apiproxy.py        Sat Jun  5 18:56:52 2010
@@ -58,6 +58,7 @@
     # asynchronous queue and better catch any funny race-conditions or
     # unclear event ordering dependencies.
     self.enqueued = collections.deque()
+    self.complete = collections.deque()

   def start_call(self, package, call, pbrequest, pbresponse, user_callback,
                  deadline=None):
@@ -65,10 +66,11 @@
     if not callable(user_callback):
       raise TypeError('%r not callable' % user_callback)

+    done_callback = lambda: user_callback(pbresponse, None)
     rpc = AsyncRPC(package, call, pbrequest, pbresponse,
-                   lambda: user_callback(pbresponse, None),
+                   lambda: self.end_call(done_callback),
                    deadline=deadline)
-    setattr(rpc, 'user_callback', user_callback) # TODO make this pretty
+    setattr(rpc, 'user_callback', user_callback)
     self.enqueued.append(rpc)
     show_request = '...'
     if rpc.package == 'urlfetch':
@@ -77,15 +79,18 @@
                   rpc.package, rpc.call, show_request)
     rpc.MakeCall()

+  def end_call(self, rpc):
+ """An outstanding RPC has completed, enqueue its callback for execution."""
+    self.complete.append(rpc)
+
   def rpcs_outstanding(self):
     """Returns the number of asynchronous RPCs pending in this proxy."""
     return len(self.enqueued)

-  def wait_one(self):
- """Wait for a single RPC to finish. Returns True if one was processed."""
+  def _wait_one(self):
+    """Wait for a single RPC to finish."""
     if not self.enqueued:
-      return False
-
+      return
     rpc = self.enqueued.popleft()
     logging.debug('Waiting for RPC(%s, %s, .., ..)', rpc.package, rpc.call)
     rpc.Wait()
@@ -93,12 +98,32 @@
       rpc.CheckSuccess()
     except (apiproxy_errors.Error, apiproxy_errors.ApplicationError), e:
       rpc.user_callback(None, e)
-    return True
+
+  def _run_callbacks(self):
+    """Runs a single RPC's success callback.
+
+ Callbacks are run in a loop like this from the wait() callstack to avoid
+    race conditions from the APIProxy. Any API call can cause asynchronous
+    callbacks to fire before the main thread goes to sleep, which means a
+    user callback could run *before* a Commit() call finishes; this causes
+ really bad situations when the user callback also does some API calls. To
+    handle this properly, all callbacks will just go onto the completion
+    queue, and then run at the top of the stack here after at least one RPC
+    waiting period has finished.
+    """
+    while True:
+      try:
+        callback = self.complete.popleft()
+      except IndexError:
+        return
+      else:
+        callback()

   def wait(self):
-    """Wait for RPCs to finish.  Returns True if any were processed."""
+    """Wait for RPCs to finish. Returns True if any were processed."""
     while self.enqueued:
-      self.wait_one()
+      self._wait_one()
+      self._run_callbacks()
     else:
       return False
     return True

Reply via email to