Wow, this is nice! I had sort of given up finding the cause of this :-(
Thank you for looking at this, and just in time for my presentation at
PKC in 10 days :-)

You're welcome. :-)

--- /usr/lib/python2.5/site-packages/twisted/internet/base.py   2008-07-29 
22:13:54.000000000 +0200
+++ internet/base.py    2009-02-20 12:27:42.000000000 +0100
@@ -1127,17 +1127,32 @@
         self.startRunning(installSignalHandlers=installSignalHandlers)
         self.mainLoop()
+ + def setLoopCall(self, f, *args, **kw):
+        self.loopCall = (f, args, kw)
+
+
+    def myIteration(self, t):
+        # Advance simulation time in delayed event
+        # processors.
+        self.runUntilCurrent()
+
+        if (t is None):
+            t2 = self.timeout()
+            t = self.running and t2
+
+        self.doIteration(t)
+
+        if ("loopCall" in dir(self)):
+            f, args, kw = self.loopCall
+            f(*args, **kw)
+
def mainLoop(self):
         while self._started:
             try:
                 while self._started:
-                    # Advance simulation time in delayed event
-                    # processors.
-                    self.runUntilCurrent()
-                    t2 = self.timeout()
-                    t = self.running and t2
-                    self.doIteration(t)
+                    self.myIteration(None)
             except:
                 log.msg("Unexpected error in main loop.")
                 log.err()

The changes above basically insert a call to self.loopCall after each
doIteration invocation, right?

When the loopCall is process_deferred_queue the main loop becomes:

  self.runUntilCurrent()
  self.doIteration(t)
  runtime.process_deferred_queue

  self.runUntilCurrent()
  self.doIteration(t)
  runtime.process_deferred_queue

  ...

Yes, exactly.

I think we would get the same result if we started a LoopingCall that
executes process_deferred_queue with an interval of, say, 100 ms:

  
http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html

This should work since the runUntilCurrent method runs through the
waiting calls and will trigger our process_deferred_queue method.

And voilá -- no hacking of the Twisted source needed.

I'm not sure but LoopingCall._reschedule() looks more like it schedules the calls at certain tick, not as soon as possible after the interval is elapsed. This might not cost too much time but still doesn't feel very elegant. Furthermore, setting the interval very low leads to high CPU usage when waiting. Again, this is not too bad but not elegant either. The same applies if using reactor.callLater() directly.

Of course, we can avoid hacking the Twisted code by extending it within VIFF. Still, I'm in favor of the two-threaded solution because it's more elegant, doesn't have the danger of recursing too deep, and, in my opinion, it should be feasible.

diff -r e2759515f57f viff/runtime.py
--- a/viff/runtime.py   Thu Mar 05 21:02:57 2009 +0100
+++ b/viff/runtime.py   Fri Mar 06 13:43:14 2009 +0100
@@ -306,6 +306,8 @@
                 deferred = deq.popleft()
                 if not deq:
                     del self.incoming_data[key]
+                # just queue
+                self.factory.runtime.queue_deferred(deferred)

Why is this done?

At this time, we shouldn't call the callbacks because we might recurse into selectreactor.doSelect(). However, we want to know which deferreds are ready so we can call deferred.callback() later.

+        #: Activation depth counter
+        self.depth_counter = 0

This is for keeping track of the recursion depth in the future?

Yes. It was used in some debug output earlier but I removed it to simplify the patch.

+    def queue_deferred(self, deferred):
+        deferred.pause()
+        self.deferred_queue.append(deferred)
+
+    def process_deferred_queue(self):
+        while(self.deferred_queue):
+            deferred = self.deferred_queue.pop(0)
+            deferred.unpause()

Are you doing it this way to ensure that the Deferreds are unpaused in
the same order as they were added to the list?

Yes. I'm not sure whether this is really necessary but it seems just cleaner because the callback of the some deferred might do a lot of computations and recurse, which unnecessarily extends the lifetime of the remaining deferreds.

If that doesn't matter, then I think this would be faster:

  queue, self.deferred_queue = self.deferred_queue, []
  map(Deferred.unpause, queue)

My idea is that looping over the list with map is faster than repeatedly
popping items from the beginning (an O(n) operation).

But map() still would need O(n) time because that is the nature of calling a function n times, isn't it? Maybe the function calls are optimized but the code in the function still is called n times.

+    def activate_reactor(self):
+        self.activation_counter += 1
+
+ # setting the number to n makes the reactor called + # only every n-th time
+        if (self.activation_counter >= 2):
+            self.depth_counter += 1
+            reactor.myIteration(0)
+            self.depth_counter -= 1
+            self.activation_counter = 0

If we remove the myIteration code above, we'll have to move the calls to
reactor.runUntilCurrent and reactor.doIteration here.

A question springs to my mind: calling

  reactor.runUntilCurrent()
  reactor.doIteration(0)

is the same as calling

  reactor.iterate(0)

and the documentation for that method says:

  [...] This method is not re-entrant: you must not call it recursively;
  in particular, you must not call it while the reactor is running.

  
http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.interfaces.IReactorCore.html#iterate

How does your code ensure that we only call myIteration when we're not
in a call made by the reactor? And could we simply call reactor.iterate
instead?

We actually call it recursively but it should be reentrant if it's not called from doIteration(). doIteration() is a the same as select.doSelect(), which certainly is not reentrant. We however call it from the loop call (process_deferred_queue()) after doIterate().

Calling reactor.iterate() is not enough because it doesn't call process_deferred_queue(). The principle is that not only the communication with doSelect() is done but also the callbacks with process_deferred_queue() are processed. Only the latter triggers further communication and ends the lifetime of deferreds, which frees memory.

The following graph illustrates my hack:

                          myIteration()
                         /            \
                   doSelect()     process_deferred_queue()
                       |                    |
                 communication          callbacks
                       |                    |
                stringReceived()    mul() / open() etc.
                       |                    |
                queue_deferred()       myIteration()
                                            |
                                           ...
_______________________________________________
viff-devel mailing list (http://viff.dk/)
viff-devel@viff.dk
http://lists.viff.dk/listinfo.cgi/viff-devel-viff.dk

Reply via email to