I am trying to let netsukuku work on a virtual network on netkit.
I am experimenting strange behavior and errors, that I think are
coming from microsock module.

So, I created a little test. I built it with the core parts of module
rpc.py, to test the underneath modules micro, microsock, and the like.
(BTW I don't get why stream_request_handler uses sock.send while
TCPClient.rpc_call uses self.socket.sendall)
The test creates many tasklets and makes simultaneous connections.

When I run it with delays of 5 seconds it works as expected. When I
run it with shorter delays (1 second) it produces many times the
following message:
error: uncaptured python exception, closing channel
<ntk.lib.microsock.dispatcher connected at 0x82a7e4c> (<type
'exceptions.MemoryError'>:
[/opt/stackless/lib/python2.6/asyncore.py|read|74]
[/opt/stackless/lib/python2.6/asyncore.py|handle_read_event|413]
[../ntk/lib/microsock.py|handle_read|306]
[/opt/stackless/lib/python2.6/asyncore.py|recv|362])

Please, have a look at the test I attach, let me know if this behavior
is expected with this type of usage, or else the microsock module
needs some debug.

--Luca
import struct

import sys
sys.path.append('..')

import ntk.lib.rencode as rencode
from ntk.lib.micro import micro, microfunc, allmicro_run, Channel
from ntk.wrap.sock import Sock
import ntk.wrap.xtime as xtime
import ntk.lib.logger as logging

_data_hdr_sz = struct.calcsize("I")
def _data_pack(data):
    return struct.pack("I", len(data)) + data

def _data_unpack_from_stream_socket(socket):
    readBuffer = ""
    while True:
        rawPacket = socket.recv(_data_hdr_sz-len(readBuffer))
        if not rawPacket:
            return ""
        readBuffer += rawPacket
        if len(readBuffer) == _data_hdr_sz:
            dataLength = struct.unpack("I", readBuffer)[0]
            readBuffer = ""
            while len(readBuffer) != dataLength:
                rawPacket = socket.recv(dataLength - len(readBuffer))
                if not rawPacket:
                    return ""
                readBuffer += rawPacket
            return readBuffer

def stream_request_handler(sock, clientaddr):
    while True:
        data = _data_unpack_from_stream_socket(sock)
        if not data: break
        response = rencode.dumps('result ' * 20) # a long string
        sock.send(_data_pack(response))
    sock.close()

def micro_stream_request_handler(sock, clientaddr):
    micro(stream_request_handler, (sock, clientaddr))

def TCPServer(addr=('', 269), sockmodgen=Sock, request_handler=stream_request_handler):
    socket=sockmodgen()
    s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(addr)
    s.listen(8)
    while 1: 
        sock, clientaddr = s.accept()
        request_handler(sock, clientaddr)

@microfunc(True)
def MicroTCPServer(addr=('', 269), sockmodgen=Sock):
    TCPServer(addr, sockmodgen, micro_stream_request_handler)

class TCPClient():

    def __init__(self,
                 host='localhost',
                 port=269,
                 sockmodgen=Sock,
                 xtimemod=xtime):

        self.host = host
        self.port = port
        self.xtime = xtimemod
        self.sockfactory = sockmodgen

        self.connected = False

    def rpc_call(self, args):
        while not self.connected:
            self.connect()
            if not self.connected:
                self.xtime.swait(500)

        data = rencode.dumps(args)
        self.socket.sendall(_data_pack(data))

        recv_encoded_data = _data_unpack_from_stream_socket(self.socket)
        if not recv_encoded_data:
            raise Exception, 'rpc_call: connection closed before reply'
        recv_data = rencode.loads(recv_encoded_data)
        
        return recv_data

    def connect(self):
        socket = self.sockfactory()
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.socket.connect((self.host, self.port))
        except socket.error, e:
            print 'socket.error while connecting ->', e
        else:
            self.connected = True

    def close(self):
        self.socket.close()
        self.connected = False

    def __del__(self):
        self.close()

executed = 0

@microfunc(True)
def common_client():
    global executed

    print theClient.rpc_call('common request ' * 20) # a long string again...
    executed += 1

@microfunc(True)
def unique_client():
    global executed

    a = TCPClient('localhost', 268)
    print a.rpc_call('first request ' * 20)
    executed += 1
    print a.rpc_call('second request ' * 20)
    executed += 1

pause = 1 # how many seconds
times = 3 # how many times

print 'start'
MicroTCPServer(('', 269))
print 'started a server'
MicroTCPServer(('', 268))
print 'started another server'
xtime.swait(1000)
print 'after 1 second'
print 'Instantiate common client...'
theClient = TCPClient()
print 'done.'

for i in xrange(times):
    print 'launch a unique client'
    unique_client()
    print 'launch a msg from common client'
    common_client()
    print 'launch a unique client'
    unique_client()
    print 'launch a msg from common client'
    common_client()

    xtime.swait(pause * 1000)
    print 'after a wait of ' + str(pause) + ' seconds'


print 'waiting...'
xtime.swait(10000)
print 'after a wait of 10 seconds, executed = ' + str(executed)

_______________________________________________
Netsukuku mailing list
[email protected]
http://lists.dyne.org/mailman/listinfo/netsukuku

Reply via email to