/rev/96e0f7de22b0
changeset: 1319:96e0f7de22b0
user: Marcel Keller <[email protected]>
date: Fri Oct 02 11:27:08 2009 +0200
summary: Store data without deferreds in preprocessing pool.
diffstat:
viff/runtime.py | 33 ++++++++++++++-------------------
1 files changed, 14 insertions(+), 19 deletions(-)
diffs (56 lines):
diff -r 70c995d31d1a -r 96e0f7de22b0 viff/runtime.py
--- a/viff/runtime.py Fri Sep 25 20:30:05 2009 +0200
+++ b/viff/runtime.py Fri Oct 02 11:27:08 2009 +0200
@@ -446,7 +446,7 @@
def preprocess_wrapper(self, *args, **kwargs):
pc = tuple(self.program_counter)
try:
- return self._pool.pop(pc)
+ return succeed(self._pool.pop(pc))
except KeyError:
key = (generator, args)
pcs = self._needed_data.setdefault(key, [])
@@ -750,30 +750,25 @@
"""
def update(results, program_counters):
- # We concatenate the sub-lists in results.
- results = sum(results, [])
-
- # The pool must map program counters to Deferreds to
- # present a uniform interface for the functions we
- # pre-process.
- results = map(succeed, results)
-
# Update the pool with pairs of program counter and data.
self._pool.update(zip(program_counters, results))
- # Return a Deferred that waits on the individual results.
- # This is important to make it possible for the players to
- # avoid starting before the pre-processing is complete.
- return deep_wait(results)
-
+ wait_list = []
for ((generator, args), program_counters) in program.iteritems():
print "Preprocessing %s (%d items)" % (generator,
len(program_counters))
func = getattr(self, generator)
- results = []
- while len(results) < len(program_counters):
- results += func(*args)
- self._pool.update(zip(program_counters, results))
- return DeferredList(results).addCallback(lambda _: None)
+
+ block_size = 20
+ while program_counters:
+ results = []
+ while len(results) < len(program_counters) and \
+ len(results) < block_size:
+ results += func(*args)
+ ready = gatherResults(results)
+ ready.addCallback(update, program_counters[:len(results)])
+ del program_counters[:len(results)]
+ wait_list.append(ready)
+ return gatherResults(wait_list)
def input(self, inputters, field, number=None):
"""Input *number* to the computation.
_______________________________________________
viff-commits mailing list
[email protected]
http://lists.viff.dk/listinfo.cgi/viff-commits-viff.dk