# HG changeset patch # User Thomas Pelle Jakobsen <[EMAIL PROTECTED]> # Date 1226015551 -3600 # Node ID 1e8dec2648c1486537076da790745074218cdc78 # Parent 693761d8181f39ccbbf699b222d97c761127b461 Added VIFF benchmark class.
diff -r 693761d8181f -r 1e8dec2648c1 apps/benchmark/viff_benchmark.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/apps/benchmark/viff_benchmark.py Fri Nov 07 00:52:31 2008 +0100 @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2007, 2008 VIFF Development Team. +# +# This file is part of VIFF, the Virtual Ideal Functionality Framework. +# +# VIFF is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License (LGPL) as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# VIFF is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with VIFF. If not, see <http://www.gnu.org/licenses/>. + +import time +import copy +import os +import select +import sys +import subprocess +import re +from optparse import OptionParser +from viff.runtime import Runtime, create_runtime +from viff.config import load_config +from twisted.internet import reactor +from database import Database +from util import exec_on_host +from benchmark import Benchmark, parse_args + +class VIFFBenchmark(Benchmark): + """Extend VIFFBenchmark to benchmark distributed VIFF protocols. The needed + VIFF players are then automatically set up on remote hosts. + + A VIFFBenchmark object has predefined methods for generating config + files, certificates, and other VIFF-specific tasks. + + See examples/ComparisonToft05.py for example usage. + """ + + def setup_once(self, suite): + """Do VIFF related setup for whole suite.""" + Benchmark.setup_once(self, suite) + print "Setup once in VIFFBenchmark" + + # TODO: This is code duplicated from Benchmark. How to avoid that? + filename = str(self.__class__).split('.')[-1] + if sys.platform == 'win32': + scp = "pscp" + else: + scp = "scp" + hostname = suite.host_name.values()[0] # Execute this on arbitrary host + + subprocess.call([scp, "viff_benchmark.py", suite.user + "@" + hostname + + ":" + suite.work_dir]) + subprocess.call([scp, "../generate-config-files.py", suite.user + + "@" + hostname + ":" + suite.work_dir]) + subprocess.call([scp, "../generate-certificates.py", suite.user + + "@" + hostname + ":" + suite.work_dir]) + if self.attr['use_ssl'] == "True": + self.generateCertificates(suite) + self.generateConfigFiles(suite) + + + def viff_options(self): + """ Util function that returns the basic viff options, + e.g. use-ssl, profile, etc.""" + parser = OptionParser() + Runtime.add_options(parser) + args = [] + if self.attr['use_ssl']: + args.append('--ssl') + else: + args.append('--no-ssl') + options, _ = parser.parse_args(args) + return options + + def create_pre_runtime(self, player_id, players): + return create_runtime(player_id, players, self.attr['threshold'], + self.viff_options(), + runtime_class=self.get_runtime_class()) + + def get_runtime_class(self): + """ Override this method to create other than the basic runtime. + Returning None will result in the standard runtime being used.""" + return Runtime + + def protocol(self, rt, attr): + """ Returns a deferred where previous callback will return a + dict with attr name as keys and attr values as values.""" + raise NotImplemented("Override this abstract method in a subclass.") + + def err_handler(self, err): + print "Exception:", err.__class__ + print str(err) + print "Stopping reactor in an unclean fashion" + reactor.stop() + sys.exit(42) # TODO: This doesn't work either! + # TODO: If rt has been created, then rather call + # rt.shutdown() + + # Convenience method called from setup_once, e.g. on 'master' host + def generateCertificates(self, suite): + cmd = ["hostname; cd %s; python ./generate-certificates.py -n %d" % + (suite.work_dir, self.attr['n'])] + hostname = suite.host_name.values()[0] # Execute this on arbitrary host + exec_on_host(suite.user, hostname, cmd) + + # Convenience method called from setup_once, e.g. on 'master' host + def generateConfigFiles(self, suite): + cmd = ["echo $PYTHONPATH; echo $HOME; hostname; python -V; " \ + "cd %s; python ./generate-config-files.py" % suite.work_dir, \ + "-n", str(self.attr['n']), "-t", str(self.attr['threshold'])] + for host_id in suite.host_name.keys()[:self.attr['n']]: + cmd.append("%s:%d" % + (suite.host_name[host_id],suite.host_port[host_id])) + hostname = suite.host_name.values()[0] # Execute this on arbitrary host + exec_on_host(suite.user, hostname, cmd) + + + + def run_on_slave(self): + self.setup(self.attr) + player_id, players = load_config("player-%d.ini" % + self.attr['player_id']) + pre_runtime = self.create_pre_runtime(self.attr['player_id'], players) + results = [] + + # We schedule all protocol runs in one reactor. + # TODO: This might fill up stack with deferreds, + # so we need some way of inserting a break in Twisted?! + + def schedule_runs(rt, attr): + sync = rt.synchronize() + for run in range(attr['runs']): + sync.addCallback(lambda _: self.protocol(rt, attr)) + sync.addCallback(lambda res: results.append(res)) + # TODO: Only need synchronize when doing tests of time, + # bandwidth, etc. + sync.addCallback(lambda _: rt.synchronize()) + sync.addCallback(lambda _: rt.shutdown()) + + pre_runtime.addCallback(schedule_runs, self.attr) + pre_runtime.addErrback(self.err_handler) # Need this? + reactor.run() + self.report_result(results) + self.teardown(self.attr) + sys.exit(0) + + def cleanup(self, suite, procs): + """If any process is still alive, kill it.""" + # TODO: Hack -> All python processes owned by user is killed. + # Rather, save pid and use it here. + for host_id, p in procs.items(): + if not p.returncode: + print "Sending SIGKILL to " + suite.host_name[host_id] + exec_on_host(suite.user, + suite.host_name[host_id], + ["hostname; pkill -9 -u " + suite.user + + " -x python; ps -u " + suite.user]) + sys.exit(117) # TODO: What is the exit code strategy? + + def wait_for_benchmark(self, suite, procs, poll_interval=1): + """ Takes as input a dictionary with host_ids as keys and popen objects + for the running processes as values. Blocks until all remote processes + have finished with returncode zero. All output from the remote + processes is forwarded to stdout (incl. errors). If at least one remote + process returns a non-zero returncode, an exception is raised + containing a list of hostnames that have not yet finished.""" + # Take a shallow copy of the dict so we can remove objects + jobs = copy.copy(procs) + hostname = {} # host process id -> host name + hostid = {} # host process id -> host id + for host_id, job in jobs.items(): + hostname[job.stdout.fileno()] = suite.host_name[host_id] + hostid[job.stdout.fileno()] = host_id + while len(jobs) > 0: + # TODO: This will not work on Windows? In order to fix this, + # have a look at http://code.activestate.com/recipes/440554/. + active = [job.stdout.fileno() for job in jobs.values()] + ready = select.select(active, [], [], poll_interval)[0] + for fd in ready: + # Don't use out.stdout.read() since select() is not aware of + # its buffers. + output = os.read(fd, 4096) # TODO: Too low max value here? + print "-----" + hostname[fd] + "--------------------:" + print output + print "-------------------------------------------" + # TODO: Do we need to dispose the read data somehow? + # TODO: We assume here that whole lines are read! + m = re.search('COMPLETED WITH RETURNCODE (\d)$', output, + re.MULTILINE) + if m: + print "removing host " + hostname[fd] + del jobs[hostid[fd]] + returncode = int(m.group(1)) + if not returncode == 0: + raise Exception(hostname[fd] + + " failed with returncode " + + str(returncode), jobs) + + + # TODO: Would be nice to refactor by having + # Benchmark -> DistributedBenchmark -> VIFFBenchmark + # and to move the logic like this for running the benchmarks to + # the respective classes. + def run_benchmark(self, suite): + """ Starts a benchmark on the remote hosts and returns a dictionary + with host_id as keys and running popen objects as values.""" + player_id = self.attr['n'] + procs = {} + host_ids = suite.host_name.keys()[:self.attr['n']] + host_ids.reverse() + for host_id in host_ids: + # TODO: Hack -> Benchmarks must then exist only in files with same + # name. + filename = str(self.__class__).split('.')[-1] + ".py" + # TODO: Would like to run ./benchmarkname.py instead, but that + # doesn't work on Windows + + command = "cd %s; python -u %s" % (suite.work_dir, filename) + + # Hosts are given player_id's from 1 and up + # TODO: How to append in a nice way to strings? + command = command + " player_id=%d" % player_id + command = command + " host_id=%d" % host_id + + # TTP with highest player_id must start, etc. + player_id = player_id - 1 + for attr, val in self.attr.items(): + command = command + " " + attr + "=" + str(val) + # TODO: Ugly HACK! Need this since ssh via subprocess doesn't + # stop. + command = command + "; echo COMPLETED WITH RETURNCODE $?" + procs[host_id] = exec_on_host(suite.user, suite.host_name[host_id], + [command], sync=False) + # TODO: Hack -> Two second to start twisted... should + # rather wait for some safe sign from process showing that + # the reactor has started! + time.sleep(2) + return procs + + def run_on_master(self, suite): + self.setup_once(suite) + procs = self.run_benchmark(suite) + try: + self.wait_for_benchmark(suite, procs) + except KeyboardInterrupt: + print "Ctrl-C detected, cleaning up..." + self.cleanup(suite, procs) + except Exception, ex: + print str(ex) + print "Cleaning up..." + self.cleanup(suite, procs) _______________________________________________ viff-patches mailing list [email protected] http://lists.viff.dk/listinfo.cgi/viff-patches-viff.dk
