Alpt ha scritto:
On Sun, Mar 15, 2009 at 07:27:30PM +0000, <Francesco Losciale>:
#rpc.BcastClient(devs=[_rpc_caller.dev]).radar.time_register(radar_id, self.netid)
           self.broadcast.radar.time_register(radar_id, self.netid)

We can use self.broadcast.radar.time_register(radar_id, self.netid). The
difference is just in the device used to send the broadcast message. With
self.broadcast all the available devices are used. This isn't optimal.
Btw, the problem isn't here.

Yes, what i've said doesn't make sense, because sockets uses a random local port further the remote :P. However there's no need to replace this line.

I said in the previous mail that dispatcher.close() is called in the time between socket creation and socket connection. So i've set a breakpoint in asyncore.close() in this way:

def close(self):
       import pdb
       pdb.set_trace()
       asyncore.dispatcher.close(self)

Open just 2 virtual host and this is the pdb output:

pc3:~# ./ntk_verbose
DEBUG I am 194.227.149.129
> /hosthome/workspace/ntk/pyntk/ntk/lib/microsock.py(250)close()
-> asyncore.dispatcher.close(self)
(Pdb) where
 /hosthome/workspace/ntk/pyntk/ntk/lib/rpc.py(339)dgram_request_handler()
-> response = rpcdispatcher.marshalled_dispatch(caller, data)
 /hosthome/workspace/ntk/pyntk/ntk/lib/rpc.py(201)marshalled_dispatch()
-> response = self.dispatch(caller, *unpacked)
 /hosthome/workspace/ntk/pyntk/ntk/lib/rpc.py(185)dispatch()
-> response = self._dispatch(caller, func, params)
 /hosthome/workspace/ntk/pyntk/ntk/lib/rpc.py(176)_dispatch()
-> return func(caller, *params)
 /hosthome/workspace/ntk/pyntk/ntk/core/radar.py(403)reply()
-> rpc.BcastClient(devs=[_rpc_caller.dev]).radar.time_register(radar_id, self.netid)
 /hosthome/workspace/ntk/pyntk/ntk/lib/microsock.py(141)__del__()
-> self.dispatcher.close()
> /hosthome/workspace/ntk/pyntk/ntk/lib/microsock.py(250)close()
-> asyncore.dispatcher.close(self)
(Pdb)

that catch the udp socket managing. In this way instead we can take the tcp socket type, but you need to open 3 hosts:

   def close(self):
       import pdb
       if self.socket.type == SOCK_STREAM: pdb.set_trace()
       asyncore.dispatcher.close(self)

and we can see that .close() is called from handle_read.

I've attached a patch that enable RPC.
Index: ntk/lib/microsock.py
===================================================================
--- ntk/lib/microsock.py        (revision 1527)
+++ ntk/lib/microsock.py        (working copy)
@@ -268,11 +268,8 @@
 
     # asyncore doesn't support this.  Why not?
     def fileno(self):
-            # XXX: self.socket.fileno() raises a Bad file descriptor error.
-            #      Therefore, we're using _fileno as a hack. This has to be
-            #      cleaned.
-            # return self.socket.fileno() 
-        return self._fileno
+        return self.socket.fileno() 
+        
 
     def handle_accept(self):
         if self.acceptChannel and self.acceptChannel.ch.balance < 0:
@@ -304,12 +301,8 @@
                 self.recvChannel.send((ret, address))
             else:
                 ret = asyncore.dispatcher.recv(self, self.maxreceivebuf)
-                # Not sure this is correct, but it seems to give the
-                # right behaviour.  Namely removing the socket from
-                # asyncore.
-                if not ret:
-                    self.close()
-                self.recvChannel.send(ret)
+                if ret:
+                    self.recvChannel.send(ret)
         except stdsocket.error, err:
             # XXX Is this correct?
             # If there's a read error assume the connection is
Index: ntk/lib/crypto.py
===================================================================
--- ntk/lib/crypto.py   (revision 1527)
+++ ntk/lib/crypto.py   (working copy)
@@ -17,11 +17,16 @@
 # Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 ##
  
-from M2Crypto.RSA import RSA, gen_key, load_pub_key
+from M2Crypto.RSA import RSA, gen_key, load_pub_key, load_key
 
 import hashlib
 
- 
+def fnv_32_buf(msg, hval=0x811c9dc5):
+    for c in xrange(len(msg)):
+        hval += (hval<<1) + (hval<<4) + (hval<<7) + (hval<<8) + (hval<<24)
+        hval ^= ord(msg[c])
+    return hval
+
 def sha1(msg):
     s = hashlib.sha1()
     s.update(msg)
@@ -32,30 +37,24 @@
     m.update(msg)
     return m.digest()
  
-def genkeys(keysize=1024, rsa_pub_exponent=65537):
-    """ Generate a key pair
- 
-        @return M2Crypto.RSA.RSA 
-    """
-    def do_nothing: pass
-    return gen_key(keysize, rsa_pub_exponent, callback=do_nothing) 
- 
-def get_pub_key(keys):
-        '''Return just the public key'''
-        # TODO: maybe just "return keys.pub" ?
-        pass
+# 
+# Usage
+# 
+# keys = KeyPair("C:\\files\pub_key.txt", "C:\\files\prv_key.txt")
+#  keys.load()
+# or
+#  keys.generate() 
+#  keys.save()
+# signature = keys.sign("prova")
+# pubk = keys.get_pub_key()
 
-def sign(key, msg):
-    """ Sign a message with key
- 
-        @param key istance of M2Crypto.RSA.RSA
-        @param msg
- 
-        @return signature 
-    """
-    return keys.sign(sha1(msg))
- 
-def verify(key, msg, signature): 
+# send public key with signature and message
+
+# if verify("prova", signature, pubk):
+#    print "Everything works well"
+
+
+def verify(msg, signature, pubk): 
     """ Verify a message signature with key
  
         @param key istance of M2Crypto.RSA.RSA
@@ -64,11 +63,48 @@
  
         @return boolean
     """
-    return keys.verify(sha1(msg), signature)
+    return pubk.verify(sha1(msg), signature)
+
  
-
-def save_keys(file):
+class KeyPair():
+    
+    def __init__(self, pubk_path, prvk_path):
+        self.pubk = None
+        self.keys = None
+        self.pubk_path = pubk_path
+        self.prvk_path = prvk_path
+        
+    def generate(self, keysize=1024, rsa_pub_exponent=65537):
+        """ Generate a key pair
+ 
+            @return M2Crypto.RSA.RSA 
+        """
+        def do_nothing(): pass
+        self.keys = gen_key(keysize, rsa_pub_exponent, callback=do_nothing)
+        
+ 
+    def get_pub_key(self):
+        '''Return just the public key'''
+        if not self.pubk:                   
+            return load_pub_key(self.pubk_path)
+        return self.pubk                       
+        
+    def sign(self, msg):
+        """ Sign a message with key
+            @param msg
+ 
+            @return signature 
+        """
+        return self.keys.sign(sha1(msg))
+ 
+    def save(self): 
         '''Save the keys to a file'''
-        pass
-def load_keys(file):
-        pass
+        def do_nothing(null): return "passphrase"
+        self.keys.save_key(self.prvk_path, callback=do_nothing)
+        self.keys.save_pub_key(self.pubk_path)
+        
+    def load(self): 
+        def do_nothing(null): return "passphrase"
+        self.keys = load_key(self.prvk_path, callback=do_nothing)
+        self.pubk = load_pub_key(self.pubk_path)
+ 
\ No newline at end of file
Index: ntk/lib/rpc.py
===================================================================
--- ntk/lib/rpc.py      (revision 1527)
+++ ntk/lib/rpc.py      (working copy)
@@ -148,8 +148,6 @@
           globals()
         """
 
-        logging.debug("func_get: "+str(func_name))
-
         splitted = func_name.split('.')
 
         if not len(splitted):
@@ -170,7 +168,6 @@
         return None
 
     def _dispatch(self, caller, func_name, params):
-        logging.debug("_dispatch: "+func_name+"("+str(params)+")")
         func = self.func_get(func_name)
         if func is None:
             raise RPCFuncNotRemotable('Function %s is not remotable' % 
func_name)
@@ -187,9 +184,7 @@
         try:
             response = self._dispatch(caller, func, params)
         except Exception, e:
-            logging.debug(str(e))
             response = ('rmt_error', str(e))
-        logging.debug("dispatch response: "+str(response))
         return response
 
     def marshalled_dispatch(self, caller, data):
@@ -201,7 +196,6 @@
                 error=1
         if error or not isinstance(unpacked, tuple) or not len(unpacked) == 2:
             e = 'Malformed packet received from '+caller.ip
-            logging.debug(e)
             response = ('rmt_error', str(e))
         else:
             response = self.dispatch(caller, *unpacked)
@@ -247,21 +241,17 @@
     return ""
 
 def stream_request_handler(sock, clientaddr, dev, rpcdispatcher):
-    logging.debug('Connected from %s, dev %s', clientaddr, dev)
     caller = CallerInfo(clientaddr[0], clientaddr[1], dev, sock)
     while True:
         try:
             data = _data_unpack_from_stream_socket(sock)
             if not data: break
-            logging.debug('Handling data: %s', data)
             response = rpcdispatcher.marshalled_dispatch(caller, data)
-            logging.debug('Response: %s', response)
         except RPCError:
-            logging.debug('An error occurred during request handling')
+            pass
 
         sock.send(_data_pack(response))
         #self.request.close()
-        logging.debug('Response sent')
     sock.close()
 
 def micro_stream_request_handler(sock, clientaddr, dev, rpcdispatcher):
@@ -313,7 +303,6 @@
         if not recv_encoded_data:
                 raise RPCNetError, 'connection closed before reply'
         recv_data = rencode.loads(recv_encoded_data)
-        logging.debug("Recvd data: "+str(recv_data))
 
         # Handling errors
         # I receive a message with the following format:
@@ -345,13 +334,11 @@
     Handles all request and try to decode them.
     '''
     caller = CallerInfo(clientaddr[0], clientaddr[1], dev, sock)
-    logging.debug('UDP packet from %s, dev %s', clientaddr, dev)
     try:
         data = _data_unpack_from_buffer(packet)
-        logging.debug('Handling data: %s', data)
         response = rpcdispatcher.marshalled_dispatch(caller, data)
     except RPCError:
-        logging.debug('An error occurred during request handling')
+        pass
 
 def micro_dgram_request_handler(sock, clientaddr, packet, dev, rpcdispatcher):
     micro(dgram_request_handler, (sock, clientaddr, packet, dev, 
rpcdispatcher))
@@ -441,4 +428,4 @@
         self.rpc_call(func_name, params)
 
     def __del__(self):
-        self.close()
+        pass
Index: ntk/ntkd.py
===================================================================
--- ntk/ntkd.py (revision 1527)
+++ ntk/ntkd.py (working copy)
@@ -32,8 +32,11 @@
 from ntk.config import settings, ImproperlyConfigured
 from ntk.lib.micro import micro, allmicro_run
 from ntk.network import NICManager, Route
+from ntk.network.inet import ip_to_str
 from ntk.wrap.sock import Sock
 
+import logging
+
 class NtkNode(object):
 
     def __init__(self,
@@ -70,6 +73,9 @@
         self.radar = radar.Radar(rpcbcastclient, xtimemod)
         self.neighbour = self.radar.neigh
         self.maproute = maproute.MapRoute(settings.LEVELS, self.gsize, None)
+        
+        logging.debug("I am %s " % ip_to_str((self.maproute.me)))
+        
         self.etp = qspn.Etp(self.radar, self.maproute)
 
         self.p2p = p2p.P2PAll(self.radar, self.maproute)
Index: ntk/core/radar.py
===================================================================
--- ntk/core/radar.py   (revision 1527)
+++ ntk/core/radar.py   (working copy)
@@ -234,6 +234,8 @@
                 # create a TCP connection to the neighbour
                 self.ntk_client[key] = rpc.TCPClient(ip_to_str(key))
 
+                logging.debug("Neighbour found: %s " % ip_to_str(key))
+
                 # send a message notifying we added a node
                 self.events.send('NEIGH_NEW',
                                  (Neigh(bestdev=ip_table[key].bestdev,
@@ -377,8 +379,7 @@
         """ Send broadcast packets and store the results in neigh """
 
         self.radar_id = randint(0, 2**32-1)
-        logging.debug('radar scan %s' % self.radar_id)
-
+        
         # we're sending the broadcast packets NOW
         self.bcast_send_time = self.xtime.time()
 
Index: ntk/core/route.py
===================================================================
--- ntk/core/route.py   (revision 1527)
+++ ntk/core/route.py   (working copy)
@@ -66,6 +66,8 @@
 
 class NullRem(Rem):
     """The equivalent of None for the REM"""
+    def __init__(self):
+            Rem.__init__(self, 0)
     def __add__(self, b):
             return b
     def __radd__(self, b):
@@ -409,7 +411,7 @@
         lvl = self.nip_cmp(self.me, neigh.nip)
         return (lvl, neigh.nip[lvl])
 
-    def bestroutes_get(self, f=ftrue):
+    def bestroutes_get(self, f=ftrue, packed=False):
         """Returns the list of all the best routes of the map.
 
            Let L be the returned list, then L[lvl] is the list of all the best
@@ -420,10 +422,19 @@
            If a function `f' has been specified, then each element L[lvl][i]
            in L is such that f(L[lvl][i])==True
            """
-        return [
-                [ (dst, br.gw, br.rem)
-                        for dst in xrange(self.gsize)
-                            for br in [self.node_get(lvl, dst).best_route()]
-                                if br is not None and f((dst, br.gw, br.rem))
-                ] for lvl in xrange(self.levels)
-               ]
+        if packed:
+            return [
+                    [ (dst, br.gw, br.rem._pack())
+                            for dst in xrange(self.gsize)
+                                for br in [self.node_get(lvl, 
dst).best_route()]
+                                    if br is not None and f((dst, br.gw, 
br.rem))
+                    ] for lvl in xrange(self.levels)
+                   ]
+        else:
+            return [
+                    [ (dst, br.gw, br.rem)
+                            for dst in xrange(self.gsize)
+                                for br in [self.node_get(lvl, 
dst).best_route()]
+                                    if br is not None and f((dst, br.gw, 
br.rem))
+                    ] for lvl in xrange(self.levels)
+                   ]
Index: ntk/core/hook.py
===================================================================
--- ntk/core/hook.py    (revision 1527)
+++ ntk/core/hook.py    (working copy)
@@ -150,7 +150,7 @@
         lvl = H[1][0]
         fnl = H[1][1]
         newnip[lvl] = choice(fnl)
-        newnip.append(0)
+        
         for l in reversed(xrange(lvl)): newnip[l]=randint(0, 
self.maproute.gsize)
 
         # If we are alone, let's generate our netid
Index: ntk/core/qspn.py
===================================================================
--- ntk/core/qspn.py    (revision 1527)
+++ ntk/core/qspn.py    (working copy)
@@ -21,6 +21,8 @@
 from ntk.lib.event import Event
 from ntk.core.route import NullRem, DeadRem
 
+import logging
+
 def is_listlist_empty(l):
         """
             Returns true if l=[[],[], ...]
@@ -76,7 +78,7 @@
 
         ## Forward the ETP to the neighbours
         flag_of_interest=1
-        TP = [(self.maproute.me, NullRem())]    # Tracer Packet included in
+        TP = [(self.maproute.me, NullRem()._pack())]    # Tracer Packet 
included in
         block_lvl = 0                           # the first block of the ETP
         etp = (R2, [(block_lvl, TP)], flag_of_interest)
         self.etp_forward(etp, [neigh.id])
@@ -87,7 +89,13 @@
         """Builds and sends a new ETP for the changed link case
 
         If oldrem=None, the node `neigh' is considered new."""
-
+        
+        # resolve this: if don't swait, ENETUNREACH is raised,
+        # probably because at this moment i don't have routes
+        from ntk.wrap.xtime import swait
+        swait(5000) # waiting for route caching
+        
+        
         ## Update the map
         if oldrem == None:
                 self.maproute.routeneigh_add(neigh)
@@ -98,7 +106,7 @@
         ## Create R
         def gw_isnot_neigh((dst, gw, rem)):
                 return gw != neigh.id
-        R = self.maproute.bestroutes_get(gw_isnot_neigh)
+        R = self.maproute.bestroutes_get(gw_isnot_neigh, packed=True)
         if is_listlist_empty(R):
                 # R is empty: no need to proceed
                 return
@@ -110,9 +118,11 @@
         R=map(takeoff_gw_lvl, R)
         ##
 
+        logging.debug("sending ETP for NEW_NEIGH")
+        
         ## Send the ETP to `neigh'
         flag_of_interest=1
-        TP = [(self.maproute.me[0], NullRem)]
+        TP = [(self.maproute.me[0], NullRem()._pack())]
         etp = (R, [(0, TP)], flag_of_interest)
         neigh.ntkd.etp.etp_exec(self.maproute.me, *etp)
         ##
@@ -131,7 +141,9 @@
              TP is a list of (hop, rem) pairs.
         flag_of_interest: a boolean
         """
-
+        
+        logging.debug("executing ETP")
+        
         gwnip   = sender_nip
         neigh   = self.neigh.ip_to_neigh(gwnip)
         gw      = neigh.id
Index: ntk/core/krnl_route.py
===================================================================
--- ntk/core/krnl_route.py      (revision 1527)
+++ ntk/core/krnl_route.py      (working copy)
@@ -28,6 +28,8 @@
 from ntk.network import Route as KRoute
 from ntk.network.inet import ip_to_str, lvl_to_bits
 
+import logging 
+
 class KrnlRoute(object):
     def __init__(self, neigh, maproute):
         self.maproute = maproute
@@ -73,7 +75,8 @@
         ipstr = ip_to_str(neigh.ip)
         dev = neigh.bestdev[0]
         gwipstr = ipstr
-
+        
+        logging.debug("New route towards %s " % ipstr)
         KRoute.add(ipstr, lvl_to_bits(0), dev, gwipstr)
 
     def neigh_rem_changed(self, neigh):
_______________________________________________
Netsukuku mailing list
[email protected]
http://lists.dyne.org/mailman/listinfo/netsukuku

Reply via email to