# HG changeset patch
# User Thomas Pelle Jakobsen <[EMAIL PROTECTED]>
# Date 1226015551 -3600
# Node ID 1e8dec2648c1486537076da790745074218cdc78
# Parent  693761d8181f39ccbbf699b222d97c761127b461
Added VIFF benchmark class.

diff -r 693761d8181f -r 1e8dec2648c1 apps/benchmark/viff_benchmark.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/apps/benchmark/viff_benchmark.py  Fri Nov 07 00:52:31 2008 +0100
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2007, 2008 VIFF Development Team.
+#
+# This file is part of VIFF, the Virtual Ideal Functionality Framework.
+#
+# VIFF is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License (LGPL) as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# VIFF is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+import time
+import copy
+import os
+import select
+import sys
+import subprocess
+import re
+from optparse import OptionParser
+from viff.runtime import Runtime, create_runtime
+from viff.config import load_config
+from twisted.internet import reactor
+from database import Database
+from util import exec_on_host
+from benchmark import Benchmark, parse_args
+
+class VIFFBenchmark(Benchmark):
+    """Extend VIFFBenchmark to benchmark distributed VIFF protocols. The needed
+    VIFF players are then automatically set up on remote hosts.
+    
+    A VIFFBenchmark object has predefined methods for generating config
+    files, certificates, and other VIFF-specific tasks.
+    
+    See examples/ComparisonToft05.py for example usage.
+    """
+    
+    def setup_once(self, suite):
+        """Do VIFF related setup for whole suite."""
+        Benchmark.setup_once(self, suite)
+        print "Setup once in VIFFBenchmark"
+        
+        # TODO: This is code duplicated from Benchmark. How to avoid that?
+        filename = str(self.__class__).split('.')[-1]
+        if sys.platform == 'win32':
+            scp = "pscp"
+        else:
+            scp = "scp"
+        hostname = suite.host_name.values()[0] # Execute this on arbitrary host
+
+        subprocess.call([scp, "viff_benchmark.py", suite.user + "@" + hostname 
+
+                         ":" + suite.work_dir])
+        subprocess.call([scp, "../generate-config-files.py", suite.user +
+                         "@" + hostname + ":" + suite.work_dir])
+        subprocess.call([scp, "../generate-certificates.py", suite.user +
+                         "@" + hostname + ":" + suite.work_dir])
+        if self.attr['use_ssl'] == "True":
+            self.generateCertificates(suite)
+        self.generateConfigFiles(suite)
+        
+        
+    def viff_options(self):
+        """ Util function that returns the basic viff options,
+        e.g. use-ssl, profile, etc."""
+        parser = OptionParser()
+        Runtime.add_options(parser)
+        args = []
+        if self.attr['use_ssl']:
+            args.append('--ssl')
+        else:
+            args.append('--no-ssl')
+        options, _ = parser.parse_args(args)
+        return options
+    
+    def create_pre_runtime(self, player_id, players):
+        return create_runtime(player_id, players, self.attr['threshold'], 
+                              self.viff_options(),
+                              runtime_class=self.get_runtime_class()) 
+    
+    def get_runtime_class(self):
+        """ Override this method to create other than the basic runtime.
+        Returning None will result in the standard runtime being used."""
+        return Runtime
+    
+    def protocol(self, rt, attr):
+        """ Returns a deferred where previous callback will return a
+        dict with attr name as keys and attr values as values."""
+        raise NotImplemented("Override this abstract method in a subclass.")
+        
+    def err_handler(self, err):
+        print "Exception:", err.__class__
+        print str(err)
+        print "Stopping reactor in an unclean fashion"
+        reactor.stop()
+        sys.exit(42) # TODO: This doesn't work either!
+        # TODO: If rt has been created, then rather call
+        # rt.shutdown()
+        
+    # Convenience method called from setup_once, e.g. on 'master' host
+    def generateCertificates(self, suite):
+        cmd = ["hostname; cd %s; python ./generate-certificates.py -n %d" %
+               (suite.work_dir, self.attr['n'])]
+        hostname = suite.host_name.values()[0] # Execute this on arbitrary host
+        exec_on_host(suite.user, hostname, cmd)
+        
+    # Convenience method called from setup_once, e.g. on 'master' host
+    def generateConfigFiles(self, suite):
+        cmd = ["echo $PYTHONPATH; echo $HOME; hostname; python -V; " \
+               "cd %s; python ./generate-config-files.py" % suite.work_dir, \
+               "-n", str(self.attr['n']), "-t", str(self.attr['threshold'])]
+        for host_id in suite.host_name.keys()[:self.attr['n']]:
+            cmd.append("%s:%d" %
+                       (suite.host_name[host_id],suite.host_port[host_id]))
+        hostname = suite.host_name.values()[0] # Execute this on arbitrary host
+        exec_on_host(suite.user, hostname, cmd)
+        
+    
+    
+    def run_on_slave(self):
+        self.setup(self.attr)
+        player_id, players = load_config("player-%d.ini" %
+                                         self.attr['player_id'])
+        pre_runtime = self.create_pre_runtime(self.attr['player_id'], players)
+        results = []
+        
+        # We schedule all protocol runs in one reactor.
+        # TODO: This might fill up stack with deferreds,
+        # so we need some way of inserting a break in Twisted?!
+   
+        def schedule_runs(rt, attr):
+            sync = rt.synchronize()
+            for run in range(attr['runs']):
+                sync.addCallback(lambda _: self.protocol(rt, attr))
+                sync.addCallback(lambda res: results.append(res))
+                # TODO: Only need synchronize when doing tests of time,
+                # bandwidth, etc.
+                sync.addCallback(lambda _: rt.synchronize())
+            sync.addCallback(lambda _: rt.shutdown())
+        
+        pre_runtime.addCallback(schedule_runs, self.attr)
+        pre_runtime.addErrback(self.err_handler) # Need this?
+        reactor.run()
+        self.report_result(results)
+        self.teardown(self.attr)
+        sys.exit(0)
+        
+    def cleanup(self, suite, procs):
+        """If any process is still alive, kill it."""
+        # TODO: Hack -> All python processes owned by user is killed.
+        #       Rather, save pid and use it here.
+        for host_id, p in procs.items():
+            if not p.returncode:
+                print "Sending SIGKILL to " + suite.host_name[host_id]
+                exec_on_host(suite.user, 
+                             suite.host_name[host_id], 
+                             ["hostname; pkill -9 -u " + suite.user + 
+                              " -x python; ps -u " + suite.user])
+        sys.exit(117) # TODO: What is the exit code strategy?
+            
+    def wait_for_benchmark(self, suite, procs, poll_interval=1):
+        """ Takes as input a dictionary with host_ids as keys and popen objects
+        for the running processes as values. Blocks until all remote processes
+        have finished with returncode zero. All output from the remote
+        processes is forwarded to stdout (incl. errors). If at least one remote
+        process returns a non-zero returncode, an exception is raised
+        containing a list of hostnames that have not yet finished."""
+        # Take a shallow copy of the dict so we can remove objects
+        jobs = copy.copy(procs)
+        hostname = {} # host process id -> host name
+        hostid = {} # host process id -> host id 
+        for host_id, job in jobs.items():
+            hostname[job.stdout.fileno()] = suite.host_name[host_id]
+            hostid[job.stdout.fileno()] = host_id
+        while len(jobs) > 0:
+            # TODO: This will not work on Windows? In order to fix this,
+            #       have a look at http://code.activestate.com/recipes/440554/.
+            active = [job.stdout.fileno() for job in jobs.values()]
+            ready = select.select(active, [], [], poll_interval)[0]
+            for fd in ready:
+                # Don't use out.stdout.read() since select() is not aware of
+                # its buffers.
+                output = os.read(fd, 4096) # TODO: Too low max value here?
+                print "-----" + hostname[fd] + "--------------------:"
+                print output
+                print "-------------------------------------------"
+                # TODO: Do we need to dispose the read data somehow?
+                # TODO: We assume here that whole lines are read!
+                m = re.search('COMPLETED WITH RETURNCODE (\d)$', output, 
+                              re.MULTILINE)
+                if m:
+                    print "removing host " + hostname[fd]
+                    del jobs[hostid[fd]]
+                    returncode = int(m.group(1))
+                    if not returncode == 0:
+                        raise Exception(hostname[fd] +
+                                        " failed with returncode " + 
+                                        str(returncode), jobs)
+
+
+    # TODO: Would be nice to refactor by having
+    # Benchmark -> DistributedBenchmark -> VIFFBenchmark
+    # and to move the logic like this for running the benchmarks to 
+    # the respective classes.
+    def run_benchmark(self, suite):
+        """ Starts a benchmark on the remote hosts and returns a dictionary
+        with host_id as keys and running popen objects as values."""
+        player_id = self.attr['n']
+        procs = {}
+        host_ids = suite.host_name.keys()[:self.attr['n']]
+        host_ids.reverse()
+        for host_id in host_ids:
+            # TODO: Hack -> Benchmarks must then exist only in files with same
+            # name.
+            filename = str(self.__class__).split('.')[-1] + ".py"
+            # TODO: Would like to run ./benchmarkname.py instead, but that 
+            # doesn't work on Windows
+            
+            command = "cd %s; python -u %s" % (suite.work_dir, filename)
+            
+            # Hosts are given player_id's from 1 and up
+            # TODO: How to append in a nice way to strings?
+            command = command + " player_id=%d" % player_id
+            command = command + " host_id=%d" % host_id
+            
+            # TTP with highest player_id must start, etc.
+            player_id = player_id - 1
+            for attr, val in self.attr.items():
+                command = command + " " + attr + "=" + str(val)
+                # TODO: Ugly HACK! Need this since ssh via subprocess doesn't
+                # stop.
+            command = command + "; echo COMPLETED WITH RETURNCODE $?"
+            procs[host_id] = exec_on_host(suite.user, suite.host_name[host_id],
+                                          [command], sync=False)
+             # TODO: Hack -> Two second to start twisted... should
+             # rather wait for some safe sign from process showing that 
+             # the reactor has started!
+            time.sleep(2)
+        return procs    
+        
+    def run_on_master(self, suite):
+        self.setup_once(suite)
+        procs = self.run_benchmark(suite)
+        try:
+            self.wait_for_benchmark(suite, procs)
+        except KeyboardInterrupt:
+            print "Ctrl-C detected, cleaning up..."
+            self.cleanup(suite, procs)
+        except Exception, ex:
+            print str(ex)
+            print "Cleaning up..."
+            self.cleanup(suite, procs)
_______________________________________________
viff-patches mailing list
[email protected]
http://lists.viff.dk/listinfo.cgi/viff-patches-viff.dk

Reply via email to