Hi for all,
this is the problem and how to reproduce it:

- In netkit, open two terminal and start ntk for each of them; waiting that the radar of one node see the other node and viceversa;
- Now start ntk on a third terminal, wait that radar see the neighbours;
- A 'BAD FILE DESCRIPTOR' exception will be raised when asyncore.connect() is called.

I've attached a patch to this email to disable radar debug prints ('cause radar works well) and replace them by more usefull prints.

When one node see its neighbour, TCPClient is created in a tasklet. Note that the connection has not been established yet. TCPClient contains stacklesssocket. stacklesssocket by itself is a "wrapper" for the real standard socket. Each method call of stacklesssocket is forwarded towards dispatcher. But the connect method has not been called at this moment.

When the third node joins the network, using radar it find two neighbours, so create TCPClient towards each of them; thus the NEIGH_NEW event is generated, and all listeners are executed. By the other hands, the other two nodes see that a new node is incoming, so they generate NEIGH_NEW events and all its listeners are executed too.

One of the listener executed is Etp.etp_new_changed defined inside qspn.py. At last line of this method we see the first remote call (neigh.ntkd.etp.etp_exec(...)), thus the flow goes, across TCPClient and stacklesssocket, to dispatcher at its connect() method. When asyncore.dispatcher.connect is called, the exception is raised up. Stepping inside we can see that stackless.sock._sock is of type _closedsocket.

Thus in the time between creation and connection, the socket has been closed.

I've used telnet to connect the tcp server microsock, and works. I have read tests and i also tryed to test separately TCPClient, TCPServer, ecc and everything works well outside ntk. So i think that probably the problem isn't there, and either in microsock.py. The problem should be outside or could be something wrong using asyncore (it is true that asyncore will be deprecated in newer version of python? http://mail.python.org/pipermail/python-list/2004-November/293572.html ).

Now the last important note: in dispatcher definition, inside microsock.py, read the comment at fileno() method definition.

If we try to restore 'return self.socket.fileno()' replacing 'return self.socket._fileno' (note that my patch doesn't do this), and try to start just two netkit vhosts, we can see that 'bad file description' is raised at this moment into stacklesssocket.__del__. At this point just UDP stuff is used, so i have decided to make a little mod in radar.py. I want to emphasize that i don't know if this mod is semantically correct (probably is not), but, if we replace into Radar.reply method this line (my patch doesn't do this)

#rpc.BcastClient(devs=[_rpc_caller.dev]).radar.time_register(radar_id, self.netid)
           self.broadcast.radar.time_register(radar_id, self.netid)

we can see that dispatcher.fileno() works well and no 'bad file descriptor' exception will be raised into stackless.__del__ (but if we try to connect the third node, then using TCP the problem remains). Note that the radar still works.

This could be caused in my opinion for this reason: each node has a BCastClient istance, with a socket that is connected to '<broadcast>':269 (needed by radar). If into .reply method we istance a new BcastClient, a new socket will be created that want to connect '<broadcast>':269, then descriptor is bad (at least this is what i suppose).

My impression is that should be something wrong with socket managing but not in rpc, microsock and in other deeper modules.




Index: ntk/lib/rpc.py
===================================================================
--- ntk/lib/rpc.py      (revision 1526)
+++ ntk/lib/rpc.py      (working copy)
@@ -148,8 +148,7 @@
           globals()
         """
 
-        logging.debug("func_get: "+str(func_name))
-
+        
         splitted = func_name.split('.')
 
         if not len(splitted):
@@ -170,7 +169,6 @@
         return None
 
     def _dispatch(self, caller, func_name, params):
-        logging.debug("_dispatch: "+func_name+"("+str(params)+")")
         func = self.func_get(func_name)
         if func is None:
             raise RPCFuncNotRemotable('Function %s is not remotable' % 
func_name)
@@ -187,9 +185,7 @@
         try:
             response = self._dispatch(caller, func, params)
         except Exception, e:
-            logging.debug(str(e))
             response = ('rmt_error', str(e))
-        logging.debug("dispatch response: "+str(response))
         return response
 
     def marshalled_dispatch(self, caller, data):
@@ -201,7 +197,6 @@
                 error=1
         if error or not isinstance(unpacked, tuple) or not len(unpacked) == 2:
             e = 'Malformed packet received from '+caller.ip
-            logging.debug(e)
             response = ('rmt_error', str(e))
         else:
             response = self.dispatch(caller, *unpacked)
@@ -247,21 +242,17 @@
     return ""
 
 def stream_request_handler(sock, clientaddr, dev, rpcdispatcher):
-    logging.debug('Connected from %s, dev %s', clientaddr, dev)
     caller = CallerInfo(clientaddr[0], clientaddr[1], dev, sock)
     while True:
         try:
             data = _data_unpack_from_stream_socket(sock)
             if not data: break
-            logging.debug('Handling data: %s', data)
             response = rpcdispatcher.marshalled_dispatch(caller, data)
-            logging.debug('Response: %s', response)
         except RPCError:
-            logging.debug('An error occurred during request handling')
+            pass
 
         sock.send(_data_pack(response))
         #self.request.close()
-        logging.debug('Response sent')
     sock.close()
 
 def micro_stream_request_handler(sock, clientaddr, dev, rpcdispatcher):
@@ -313,7 +304,6 @@
         if not recv_encoded_data:
                 raise RPCNetError, 'connection closed before reply'
         recv_data = rencode.loads(recv_encoded_data)
-        logging.debug("Recvd data: "+str(recv_data))
 
         # Handling errors
         # I receive a message with the following format:
@@ -325,6 +315,7 @@
         return recv_data
 
     def connect(self):
+        logging.debug("Trying connecting this tcp socket %s ", 
str(self.socket))
         self.socket.connect((self.host, self.port))
         self.connected = True
 
@@ -345,13 +336,11 @@
     Handles all request and try to decode them.
     '''
     caller = CallerInfo(clientaddr[0], clientaddr[1], dev, sock)
-    logging.debug('UDP packet from %s, dev %s', clientaddr, dev)
     try:
         data = _data_unpack_from_buffer(packet)
-        logging.debug('Handling data: %s', data)
         response = rpcdispatcher.marshalled_dispatch(caller, data)
     except RPCError:
-        logging.debug('An error occurred during request handling')
+        pass
 
 def micro_dgram_request_handler(sock, clientaddr, packet, dev, rpcdispatcher):
     micro(dgram_request_handler, (sock, clientaddr, packet, dev, 
rpcdispatcher))
Index: ntk/core/radar.py
===================================================================
--- ntk/core/radar.py   (revision 1526)
+++ ntk/core/radar.py   (working copy)
@@ -233,7 +233,10 @@
 
                 # create a TCP connection to the neighbour
                 self.ntk_client[key] = rpc.TCPClient(ip_to_str(key))
+                
+                logging.debug("Created TCPClient for %s stored at %s with this 
inner socket %s" % (ip_to_str(key), str(self.ntk_client[key]), 
str(self.ntk_client[key].socket)))
 
+
                 # send a message notifying we added a node
                 self.events.send('NEIGH_NEW',
                                  (Neigh(bestdev=ip_table[key].bestdev,
@@ -377,8 +380,7 @@
         """ Send broadcast packets and store the results in neigh """
 
         self.radar_id = randint(0, 2**32-1)
-        logging.debug('radar scan %s' % self.radar_id)
-
+        
         # we're sending the broadcast packets NOW
         self.bcast_send_time = self.xtime.time()
 
Index: ntk/core/qspn.py
===================================================================
--- ntk/core/qspn.py    (revision 1526)
+++ ntk/core/qspn.py    (working copy)
@@ -25,8 +25,12 @@
 def is_listlist_empty(L):
         """L is a list of lists.
            Returns true if L=[[],[], ...]"""
-        return sum(filter(isnot_empty, L)) == 0
+        for l in L:
+           if len(filter(isnot_empty, l)) != 0:
+               return False
+        return True
 
+
 class Etp:
     """Extended Tracer Packet"""
 
_______________________________________________
Netsukuku mailing list
[email protected]
http://lists.dyne.org/mailman/listinfo/netsukuku

Reply via email to