http://hg.viff.dk/viff/rev/89b93bc75476
changeset: 1185:89b93bc75476
user: Marcel Keller <[email protected]>
date: Wed May 20 19:34:03 2009 +0200
summary: Introduced complex callbacks.
These are only executed when process_deferred_queue() is not called recursively.
diffstat:
3 files changed, 38 insertions(+), 7 deletions(-)
apps/aes.py | 4 ++--
viff/aes.py | 4 ++--
viff/runtime.py | 37 ++++++++++++++++++++++++++++++++++---
diffs (98 lines):
diff -r 6cd5ceb87542 -r 89b93bc75476 apps/aes.py
--- a/apps/aes.py Tue May 19 17:04:07 2009 +0200
+++ b/apps/aes.py Wed May 20 19:34:03 2009 +0200
@@ -83,7 +83,7 @@
rt.shutdown()
g = gather_shares(opened_ciphertext)
- rt.schedule_callback(g, fin)
+ rt.schedule_complex_callback(g, fin)
def share_key(rt):
key = []
@@ -96,7 +96,7 @@
key.append(rt.input([inputter], GF256))
s = rt.synchronize()
- s.addCallback(encrypt, rt, key)
+ rt.schedule_complex_callback(s, encrypt, rt, key)
rt = create_runtime(id, players, 1, options)
rt.addCallback(share_key)
diff -r 6cd5ceb87542 -r 89b93bc75476 viff/aes.py
--- a/viff/aes.py Tue May 19 17:04:07 2009 +0200
+++ b/viff/aes.py Wed May 20 19:34:03 2009 +0200
@@ -377,9 +377,9 @@
trigger.addCallback(progress, i, time.time())
if (i < self.rounds - 1):
- self.runtime.schedule_callback(trigger, round, state, i +
1)
+ self.runtime.schedule_complex_callback(trigger, round,
state, i + 1)
else:
- self.runtime.schedule_callback(trigger, final_round, state)
+ self.runtime.schedule_complex_callback(trigger,
final_round, state)
prep_progress(i, start_round)
diff -r 6cd5ceb87542 -r 89b93bc75476 viff/runtime.py
--- a/viff/runtime.py Tue May 19 17:04:07 2009 +0200
+++ b/viff/runtime.py Wed May 20 19:34:03 2009 +0200
@@ -538,6 +538,7 @@
#: Queue of deferreds and data.
self.deferred_queue = []
+ self.complex_deferred_queue = []
#: Counter for calls of activate_reactor().
self.activation_counter = 0
#: Record the recursion depth.
@@ -628,6 +629,25 @@
return deferred.addCallback(callback_wrapper, *args, **kwargs)
+ def schedule_complex_callback(self, deferred, func, *args, **kwargs):
+ """Schedule a complex callback, i.e. a callback which blocks a
+ long time.
+
+ Consider that the deferred is forked, i.e. if the callback returns
+ something to be used afterwards, add further callbacks to the returned
+ deferred."""
+
+ if isinstance(deferred, Share):
+ fork = Share(deferred.runtime, deferred.field)
+ else:
+ fork = Deferred()
+
+ def queue_callback(result, runtime, fork):
+ runtime.complex_deferred_queue.append((fork, result))
+
+ deferred.addCallback(queue_callback, self, fork)
+ return self.schedule_callback(fork, func, *args, **kwargs)
+
@increment_pc
def synchronize(self):
"""Introduce a synchronization point.
@@ -778,10 +798,21 @@
self.deferred_queue.append((deferred, data))
def process_deferred_queue(self):
- """Execute the callbacks of the deferreds in the queue."""
+ """Execute the callbacks of the deferreds in the queue.
- while(self.deferred_queue):
- deferred, data = self.deferred_queue.pop(0)
+ If this function is not called via activate_reactor(), also
+ complex callbacks are executed."""
+
+ self.process_queue(self.deferred_queue)
+
+ if self.depth_counter == 0:
+ self.process_queue(self.complex_deferred_queue)
+
+ def process_queue(self, queue):
+ """Execute the callbacks of the deferreds in *queue*."""
+
+ while(queue):
+ deferred, data = queue.pop(0)
deferred.callback(data)
def activate_reactor(self):
_______________________________________________
viff-commits mailing list
[email protected]
http://lists.viff.dk/listinfo.cgi/viff-commits-viff.dk