# HG changeset patch # User Janus Dam Nielsen <[email protected]> # Date 1254816324 -7200 # Node ID 53d198cdf14cb85d81d6e6cf1ec75a16773dfbec # Parent 5a815629d825c6eae85cacfe821e8945a232ebcd An implementation of a broadcast mixin based on hashing of the received messages.
diff --git a/viff/constants.py b/viff/constants.py --- a/viff/constants.py +++ b/viff/constants.py @@ -25,3 +25,10 @@ READY = 2 SEND = 3 PAILLIER = 4 +TEXT = 5 + +# Used by the HashBroadcastMixin +INCONSISTENTHASH = 6 +OK = 7 +HASH = 8 +SIGNAL = 9 diff --git a/viff/hash_broadcast.py b/viff/hash_broadcast.py new file mode 100644 --- /dev/null +++ b/viff/hash_broadcast.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python + +# Copyright 2009 VIFF Development Team. +# +# This file is part of VIFF, the Virtual Ideal Functionality Framework. +# +# VIFF is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License (LGPL) as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# VIFF is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with VIFF. If not, see <http://www.gnu.org/licenses/>. + +from twisted.internet.defer import gatherResults, Deferred, DeferredList + +from hashlib import sha1 +from viff.constants import TEXT, INCONSISTENTHASH, OK, HASH, SIGNAL + +error_msg = "Player %i, has received an inconsistent hash %s." + +class InconsistentHashException(Exception): + pass + +class HashBroadcastMixin: + """A non-consistent broadcast scheme mainly useful for full threshold security. + + A value is send using `send_value` and when received a hash is generated and + exchanged among the receivers. If a receiver receives a hash which is not equal + to the one he generated, then he sends an error signal to the others and + they stop the computation. Else he sends an ok signal and the computation continues.""" + + def _send_message(self, pc, sender, receivers, message): + for peer_id in receivers: + self.protocols[peer_id].sendData(pc, TEXT, message) + + def _receive_broadcast(self, pc, unique_pc, sender, receivers): + # The result. + result = Deferred() + # The message store. + message = [] + # The hash store + g_hashes = {} + # The signal store + signals = {} + + def signal_received(signal, peer_id, message, num_receivers, hashes, signals): + # Store the signal. + signals[peer_id] = long(signal) + # If all signals are received then check if they are OK or INCONSISTENTHASH. + if num_receivers == len(signals.keys()): + s = reduce(lambda x, y: OK if OK == y else INCONSISTENTHASH, signals.values()) + if OK == s: + # Make the result ready. + result.callback(message[0]) + else: + raise InconsistentHashException(error_msg % (self.id, hashes)) + + def hash_received(h, unique_pc, peer_id, receivers, a_hashes): + # Store the hash. + a_hashes[peer_id] = h + # If we have received a hash from everybody, then compute the signal and send it. + if len(receivers) == len(a_hashes.keys()): + signal = OK + # First we check if the hashes we received are equal to the hash we computed ourselves. + for peer_id in receivers: + signal = signal if a_hashes[peer_id] == a_hashes[self.id] else INCONSISTENTHASH + # Then we send the SAME signal to everybody. + for peer_id in receivers: + self.protocols[peer_id].sendData(unique_pc, SIGNAL, str(signal)) + # The return value does not matter. + return None + + def message_received(m, unique_pc, message, receivers, hashes): + # Store the message. + message.append(m) + # Compute hash of message. + h = sha1(m).hexdigest() + # Store hash. + hashes[self.id] = h + # Send the hash to all receivers. + for peer_id in receivers: + self.protocols[peer_id].sendData(unique_pc, HASH, str(h)) + + # Set up receivers for hashes and signals. + # Note, we use the unique_pc to avoid data to cross method invocation boundaries. + for peer_id in receivers: + d_hash = Deferred().addCallbacks(hash_received, + self.error_handler, + callbackArgs=(unique_pc, peer_id, receivers, g_hashes)) + self._expect_data_with_pc(unique_pc, peer_id, HASH, d_hash) + d_signal = Deferred().addCallbacks(signal_received, + self.error_handler, + callbackArgs=(peer_id, message, len(receivers), g_hashes, signals)) + self._expect_data_with_pc(unique_pc, peer_id, SIGNAL, d_signal) + + # Set up receiving of the message. + d_message = Deferred().addCallbacks(message_received, + self.error_handler, + callbackArgs=(unique_pc, message, receivers, g_hashes)) + self._expect_data(sender, TEXT, d_message) + return result + + + def broadcast(self, senders, receivers, message=None): + """Broadcast the messeage from senders to receivers. + + Returns a list of deferreds if the calling player is among + the receivers and there are multiple senders. + Returns a single element if there is only on sender, or the + calling player is among the senders only. + + The order of the resulting list is guaranteed to be the same order + as the list of senders. + + Senders and receivers should be lists containing the id of the senders + and receivers, respectively. + + Note: You send implicitly to your self.""" + assert message is None or self.id in senders + + self.program_counter[-1] += 1 + + pc = tuple(self.program_counter) + if (self.id in receivers or self.id in senders): + results = [None] * len(senders) + else: + results = [] + + if self.id in senders: + self._send_message(pc, self.id, receivers, message) + + if self.id in receivers: + for x in xrange(len(senders)): + sender = senders[x] + new_pc = list(self.program_counter) + new_pc.append(x) + results[x] = self._receive_broadcast(pc, tuple(new_pc), sender, receivers) + + if self.id in senders and self.id not in receivers: + d = Deferred() + d.callback(message) + results = [d] + + self.program_counter[-1] += 1 + + if len(results) == 1: + return results[0] + + return results + + def list_str(self, s): + ls = [] + for x in s[1:-1].split(','): + x = x.strip() + ls.append(str(x)[1:-1]) + return ls + + def error_handler(self, ex): + print "Error: ", ex + return ex diff --git a/viff/test/test_hash_broadcast.py b/viff/test/test_hash_broadcast.py new file mode 100644 --- /dev/null +++ b/viff/test/test_hash_broadcast.py @@ -0,0 +1,181 @@ +# Copyright 2009 VIFF Development Team. +# +# This file is part of VIFF, the Virtual Ideal Functionality Framework. +# +# VIFF is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License (LGPL) as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# VIFF is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with VIFF. If not, see <http://www.gnu.org/licenses/>. + +from twisted.internet.defer import gatherResults, Deferred, DeferredList + +from viff.test.util import RuntimeTestCase, protocol +from viff.field import GF +from viff.runtime import Share + +from viff.comparison import Toft05Runtime +from viff.hash_broadcast import HashBroadcastMixin + +class BroadcastRuntime(Toft05Runtime, HashBroadcastMixin): + """Mix of :class:`Toft05Runtime` and + :class:`HashBroadcastRuntime`.""" + pass + +class HashBroadcastTest(RuntimeTestCase): + """Test for the hash broadcast mixin.""" + + # Number of players. + num_players = 3 + + runtime_class = BroadcastRuntime + + timeout = 10 + @protocol + def test_send(self, runtime): + """Test of send a value.""" + self.Zp = GF(6277101735386680763835789423176059013767194773182842284081) + + value = 42 + + receivers = [2, 3] + if 1 == runtime.id: + d = runtime.broadcast([1], receivers, str(value)) + else: + d = runtime.broadcast([1], receivers) + def check(x): + self.assertEquals(int(x), 42) + d.addCallback(check) + return d + + + @protocol + def test_send_two_senders_in_parallel(self, runtime): + """Test of send a value.""" + self.Zp = GF(6277101735386680763835789423176059013767194773182842284081) + + def check(ls): + for s, x in ls: + self.assertEquals(int(x), 42) + return ls + + value = 42 + + receivers = [2, 3] + if 1 == runtime.id: + d1 = runtime.broadcast([1], receivers, str(value)) + else: + d1 = runtime.broadcast([1], receivers) + + if 2 == runtime.id: + d2 = runtime.broadcast([2], [3], str(value)) + else: + d2 = runtime.broadcast([2], [3]) + + ds = [d1] + if [] != d2: + ds.append(d2) + dls = DeferredList(ds) + dls.addCallback(check) + return dls + + @protocol + def test_send_multiple_senders_in_one_burst(self, runtime): + """Test of send a value.""" + self.Zp = GF(6277101735386680763835789423176059013767194773182842284081) + + value = 42 + if 1 == runtime.id: + value = 7 + + if 1 == runtime.id or 3 == runtime.id: + ds = runtime.broadcast([1, 3], [2], str(value)) + + if 2 == runtime.id: + ds = runtime.broadcast([1, 3], [2]) + dls = DeferredList(ds) + def check(ls): + self.assertEquals(int(ls[0][1]), 7) + self.assertEquals(int(ls[1][1]), 42) + return ls + dls.addCallback(check) + return dls + return None + + + @protocol + def test_sender_in_receivers(self, runtime): + """Test of send a value.""" + self.Zp = GF(6277101735386680763835789423176059013767194773182842284081) + + value = 42 + if 1 == runtime.id: + d = runtime.broadcast([1], [1, 2, 3], str(value)) + else: + d = runtime.broadcast([1], [1, 2, 3]) + + def check(x): + self.assertEquals(int(x), 42) + return x + d.addCallback(check) + return d + + @protocol + def test_complex(self, runtime): + def check(ls): + for x, v in ls: + self.assertEquals(runtime.list_str(v), ['7', '9', '13']) + + receivers = [1, 2, 3] + def exchange((xi, rhoi1, rhoi2)): + # Send share to all receivers. + ds = runtime.broadcast(receivers, receivers, str((str(xi), str(rhoi1), str(rhoi2)))) + dls = DeferredList(ds) + dls.addCallbacks(check, runtime.error_handler) + return dls + + result = Deferred() + result.addCallbacks(exchange, runtime.error_handler) + result.callback((7, 9, 13)) + return result + + @protocol + def test_complex2(self, runtime): + def check(ls): + if (2 == runtime.id) or (1 == runtime.id): + self.assertEquals(ls[0][1], "V1") + self.assertEquals(ls[1][1], "V1") + self.assertEquals(ls[2][1], "V1") + self.assertEquals(ls[3][1], "V2") + else: + self.assertEquals(ls[0][1], "V1") + self.assertEquals(ls[1][1], "V1") + self.assertEquals(ls[2][1], "V1") + self.assertEquals(ls[3][1], "V2") + self.assertEquals(ls[4][1], "V2") + field = self.Zp + results = [] + results += runtime.broadcast(runtime.players.keys(), runtime.players.keys(), "V1") + if runtime.id in [1, 2]: + v = runtime.broadcast([1, 2], [3], "V2") + if isinstance(v, list): + results += v + else: + results.append(v) + else: + results += runtime.broadcast([1, 2], [3]) + if 3 == runtime.id: + results += [runtime.broadcast([3], runtime.players.keys(), str(7))] + else: + results += [runtime.broadcast([3], runtime.players.keys())] + dls = DeferredList(results) + runtime.schedule_callback(dls, check) + dls.addErrback(runtime.error_handler) + return dls _______________________________________________ viff-patches mailing list [email protected] http://lists.viff.dk/listinfo.cgi/viff-patches-viff.dk
