/rev/79c351c812f3
changeset: 1280:79c351c812f3
user: Janus Dam Nielsen <[email protected]>
date: Fri Oct 16 13:59:19 2009 +0200
summary: Moved Benchmark classes to their own file.
diffstat:
apps/benchmark.py | 176 ++-----------------------------------------
apps/benchmark_classes.py | 186 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 196 insertions(+), 166 deletions(-)
diffs (418 lines):
diff -r 26f7a133172a -r 79c351c812f3 apps/benchmark.py
--- a/apps/benchmark.py Fri Oct 09 16:33:15 2009 +0200
+++ b/apps/benchmark.py Fri Oct 16 13:59:19 2009 +0200
@@ -58,16 +58,13 @@
from math import log
from optparse import OptionParser
import operator
-from pprint import pformat
import viff.reactor
viff.reactor.install()
from twisted.internet import reactor
-from twisted.internet.defer import Deferred
-from viff.field import GF, GF256, FakeGF
-from viff.runtime import Runtime, create_runtime, gather_shares, \
- make_runtime_class
+from viff.field import GF, FakeGF
+from viff.runtime import Runtime, create_runtime, make_runtime_class
from viff.passive import PassiveRuntime
from viff.active import BasicActiveRuntime, \
TriplesHyperinvertibleMatricesMixin, TriplesPRSSMixin
@@ -76,8 +73,10 @@
from viff.paillier import PaillierRuntime
from viff.orlandi import OrlandiRuntime
from viff.config import load_config
-from viff.util import find_prime, rand
+from viff.util import find_prime
+from benchmark_classes import SelfcontainedBenchmarkStrategy, \
+ NeededDataBenchmarkStrategy, ParallelBenchmark, SequentialBenchmark
# Hack in order to avoid Maximum recursion depth exceeded
# exception;
@@ -85,23 +84,6 @@
last_timestamp = time.time()
-start = 0
-
-
-def record_start(what):
- global start
- start = time.time()
- print "*" * 64
- print "Started", what
-
-
-def record_stop(x, what):
- stop = time.time()
- print
- print "Total time used: %.3f sec" % (stop-start)
- print "Time per %s operation: %.0f ms" % (what, 1000*(stop-start) / count)
- print "*" * 6
- return x
operations = {"mul": (operator.mul,[]),
"compToft05": (operator.ge, [ComparisonToft05Mixin]),
@@ -177,153 +159,15 @@
else:
Field = GF
+
Zp = Field(find_prime(options.modulus))
print "Using field elements (%d bit modulus)" % log(Zp.modulus, 2)
+
count = options.count
print "I am player %d, will %s %d numbers" % (id, options.operation, count)
-class BenchmarkStrategy:
-
- def benchmark(self, *args):
- raise NotImplemented("Override this abstract method in subclasses")
-
-
-class SelfcontainedBenchmarkStrategy(BenchmarkStrategy):
-
- def benchmark(self, *args):
- sys.stdout.flush()
- sync = self.rt.synchronize()
- self.doTest(sync, lambda x: x)
- self.rt.schedule_callback(sync, self.preprocess)
- self.doTest(sync, lambda x: self.rt.shutdown())
-
-
-class NeededDataBenchmarkStrategy(BenchmarkStrategy):
-
- def benchmark(self, needed_data, pc, *args):
- self.pc = pc
- sys.stdout.flush()
- sync = self.rt.synchronize()
- self.rt.schedule_callback(sync, lambda x: needed_data)
- self.rt.schedule_callback(sync, self.preprocess)
- self.doTest(sync, lambda x: self.rt.shutdown())
-
-
-# Defining the protocol as a class makes it easier to write the
-# callbacks in the order they are called. This class is a base class
-# that executes the protocol by calling the run_test method.
-class Benchmark:
-
- def __init__(self, rt, operation):
- self.rt = rt
- self.operation = operation
- self.pc = None
-
- def preprocess(self, needed_data):
- print "Preprocess", needed_data
- if needed_data:
- print "Starting preprocessing"
- record_start("preprocessing")
- preproc = self.rt.preprocess(needed_data)
- preproc.addCallback(record_stop, "preprocessing")
- return preproc
- else:
- print "Need no preprocessing"
- return None
-
- def doTest(self, d, termination_function):
- self.rt.schedule_callback(d, self.begin)
- self.rt.schedule_callback(d, self.sync_test)
- self.rt.schedule_callback(d, self.run_test)
- self.rt.schedule_callback(d, self.sync_test)
- self.rt.schedule_callback(d, self.finished, termination_function)
- return d
-
- def begin(self, _):
- print "begin", self.rt.program_counter
- print "Runtime ready, generating shares"
- self.a_shares = []
- self.b_shares = []
- for i in range(count):
- inputter = (i % len(self.rt.players)) + 1
- if inputter == self.rt.id:
- a = rand.randint(0, Zp.modulus)
- b = rand.randint(0, Zp.modulus)
- else:
- a, b = None, None
- self.a_shares.append(self.rt.input([inputter], Zp, a))
- self.b_shares.append(self.rt.input([inputter], Zp, b))
- shares_ready = gather_shares(self.a_shares + self.b_shares)
- return shares_ready
-
- def sync_test(self, x):
- print "Synchronizing test start."
- sys.stdout.flush()
- sync = self.rt.synchronize()
- self.rt.schedule_callback(sync, lambda y: x)
- return sync
-
- def run_test(self, _):
- raise NotImplemented("Override this abstract method in a sub class.")
-
- def finished(self, needed_data, termination_function):
- sys.stdout.flush()
-
- if self.rt._needed_data:
- print "Missing pre-processed data:"
- for (func, args), pcs in needed_data.iteritems():
- print "* %s%s:" % (func, args)
- print " " + pformat(pcs).replace("\n", "\n ")
-
- return termination_function(needed_data)
-
-# This class implements a benchmark where run_test executes all
-# operations in parallel.
-class ParallelBenchmark(Benchmark):
-
- def run_test(self, shares):
- print "rt", self.rt.program_counter, self.pc
- if self.pc != None:
- self.rt.program_counter = self.pc
- else:
- self.pc = list(self.rt.program_counter)
- c_shares = []
- record_start("parallel test")
- while self.a_shares and self.b_shares:
- a = self.a_shares.pop()
- b = self.b_shares.pop()
- c_shares.append(self.operation(a, b))
- print "."
-
- done = gather_shares(c_shares)
- done.addCallback(record_stop, "parallel test")
- def f(x):
- needed_data = self.rt._needed_data
- self.rt._needed_data = {}
- return needed_data
- done.addCallback(f)
- return done
-
-
-# A benchmark where the operations are executed one after each other.
-class SequentialBenchmark(Benchmark):
-
- def run_test(self, _, termination_function, d):
- record_start("sequential test")
- self.single_operation(None, termination_function)
-
- def single_operation(self, _, termination_function):
- if self.a_shares and self.b_shares:
- a = self.a_shares.pop()
- b = self.b_shares.pop()
- c = self.operation(a, b)
- self.rt.schedule_callback(c, self.single_operation,
termination_function)
- else:
- record_stop(None, "sequential test")
- self.finished(None, termination_function)
-
# Identify the base runtime class.
base_runtime_class = runtimes[options.runtime]
@@ -379,10 +223,10 @@
pre_runtime.addCallback(update_args, options)
-def do_benchmark(runtime, operation, benchmark, *args):
- benchmark(runtime, operation).benchmark(*args)
+def do_benchmark(runtime, operation, benchmark, field, count, *args):
+ benchmark(runtime, operation, field, count).benchmark(*args)
-pre_runtime.addCallback(do_benchmark, operation, benchmark, needed_data,
options.pc)
+pre_runtime.addCallback(do_benchmark, operation, benchmark, Zp, count,
needed_data, options.pc)
print "#### Starting reactor ###"
reactor.run()
diff -r 26f7a133172a -r 79c351c812f3 apps/benchmark_classes.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/apps/benchmark_classes.py Fri Oct 16 13:59:19 2009 +0200
@@ -0,0 +1,186 @@
+# Copyright 2009 VIFF Development Team.
+#
+# This file is part of VIFF, the Virtual Ideal Functionality Framework.
+#
+# VIFF is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License (LGPL) as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# VIFF is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+import time
+
+from pprint import pformat
+
+from viff.runtime import gather_shares
+from viff.util import rand
+
+start = 0
+
+
+def record_start(what):
+ global start
+ start = time.time()
+ print "*" * 64
+ print "Started", what
+
+
+def record_stop(x, what, count):
+ stop = time.time()
+ print
+ print "Total time used: %.3f sec" % (stop-start)
+ print "Time per %s operation: %.0f ms" % (what, 1000*(stop-start) / count)
+ print "*" * 6
+ return x
+
+
+# Defining the protocol as a class makes it easier to write the
+# callbacks in the order they are called. This class is a base class
+# that executes the protocol by calling the run_test method.
+class Benchmark:
+
+ def __init__(self, rt, operation, field, count):
+ self.rt = rt
+ self.operation = operation
+ self.pc = None
+ self.field = field
+ self.count = count
+
+ def preprocess(self, needed_data):
+ print "Preprocess", needed_data
+ if needed_data:
+ print "Starting preprocessing"
+ record_start("preprocessing")
+ preproc = self.rt.preprocess(needed_data)
+ preproc.addCallback(record_stop, "preprocessing", self.count)
+ return preproc
+ else:
+ print "Need no preprocessing"
+ return None
+
+ def doTest(self, d, termination_function):
+ self.rt.schedule_callback(d, self.begin)
+ self.rt.schedule_callback(d, self.sync_test)
+ self.rt.schedule_callback(d, self.run_test)
+ self.rt.schedule_callback(d, self.sync_test)
+ self.rt.schedule_callback(d, self.finished, termination_function)
+ return d
+
+ def begin(self, _):
+ print "begin", self.rt.program_counter
+ print "Runtime ready, generating shares"
+ self.a_shares = []
+ self.b_shares = []
+ for i in range(self.count):
+ inputter = (i % len(self.rt.players)) + 1
+ if inputter == self.rt.id:
+ a = rand.randint(0, self.field.modulus)
+ b = rand.randint(0, self.field.modulus)
+ else:
+ a, b = None, None
+ self.a_shares.append(self.rt.input([inputter], self.field, a))
+ self.b_shares.append(self.rt.input([inputter], self.field, b))
+ shares_ready = gather_shares(self.a_shares + self.b_shares)
+ return shares_ready
+
+ def sync_test(self, x):
+ print "Synchronizing test start."
+ sys.stdout.flush()
+ sync = self.rt.synchronize()
+ self.rt.schedule_callback(sync, lambda y: x)
+ return sync
+
+ def run_test(self, _):
+ raise NotImplemented("Override this abstract method in a sub class.")
+
+ def finished(self, needed_data, termination_function):
+ sys.stdout.flush()
+
+ if self.rt._needed_data:
+ print "Missing pre-processed data:"
+ for (func, args), pcs in needed_data.iteritems():
+ print "* %s%s:" % (func, args)
+ print " " + pformat(pcs).replace("\n", "\n ")
+
+ return termination_function(needed_data)
+
+
+# This class implements a benchmark where run_test executes all
+# operations in parallel.
+class ParallelBenchmark(Benchmark):
+
+ def run_test(self, shares):
+ print "rt", self.rt.program_counter, self.pc
+ if self.pc != None:
+ self.rt.program_counter = self.pc
+ else:
+ self.pc = list(self.rt.program_counter)
+ c_shares = []
+ record_start("parallel test")
+ while self.a_shares and self.b_shares:
+ a = self.a_shares.pop()
+ b = self.b_shares.pop()
+ c_shares.append(self.operation(a, b))
+ print "."
+
+ done = gather_shares(c_shares)
+ done.addCallback(record_stop, "parallel test", self.count)
+ def f(x):
+ needed_data = self.rt._needed_data
+ self.rt._needed_data = {}
+ return needed_data
+ done.addCallback(f)
+ return done
+
+
+# A benchmark where the operations are executed one after each other.
+class SequentialBenchmark(Benchmark):
+
+ def run_test(self, _, termination_function, d):
+ record_start("sequential test")
+ self.single_operation(None, termination_function)
+
+ def single_operation(self, _, termination_function):
+ if self.a_shares and self.b_shares:
+ a = self.a_shares.pop()
+ b = self.b_shares.pop()
+ c = self.operation(a, b)
+ self.rt.schedule_callback(c, self.single_operation,
termination_function)
+ else:
+ record_stop(None, "sequential test", self.count)
+ self.finished(None, termination_function)
+
+
+class BenchmarkStrategy:
+
+ def benchmark(self, *args):
+ raise NotImplemented("Override this abstract method in subclasses")
+
+
+class SelfcontainedBenchmarkStrategy(BenchmarkStrategy):
+
+ def benchmark(self, *args):
+ sys.stdout.flush()
+ sync = self.rt.synchronize()
+ self.doTest(sync, lambda x: x)
+ self.rt.schedule_callback(sync, self.preprocess)
+ self.doTest(sync, lambda x: self.rt.shutdown())
+
+
+class NeededDataBenchmarkStrategy(BenchmarkStrategy):
+
+ def benchmark(self, needed_data, pc, *args):
+ self.pc = pc
+ sys.stdout.flush()
+ sync = self.rt.synchronize()
+ self.rt.schedule_callback(sync, lambda x: needed_data)
+ self.rt.schedule_callback(sync, self.preprocess)
+ self.doTest(sync, lambda x: self.rt.shutdown())
_______________________________________________
viff-commits mailing list
[email protected]
http://lists.viff.dk/listinfo.cgi/viff-commits-viff.dk