I am trying to let netsukuku work on a virtual network on netkit.
I am experimenting strange behavior and errors, that I think are
coming from microsock module.
So, I created a little test. I built it with the core parts of module
rpc.py, to test the underneath modules micro, microsock, and the like.
(BTW I don't get why stream_request_handler uses sock.send while
TCPClient.rpc_call uses self.socket.sendall)
The test creates many tasklets and makes simultaneous connections.
When I run it with delays of 5 seconds it works as expected. When I
run it with shorter delays (1 second) it produces many times the
following message:
error: uncaptured python exception, closing channel
<ntk.lib.microsock.dispatcher connected at 0x82a7e4c> (<type
'exceptions.MemoryError'>:
[/opt/stackless/lib/python2.6/asyncore.py|read|74]
[/opt/stackless/lib/python2.6/asyncore.py|handle_read_event|413]
[../ntk/lib/microsock.py|handle_read|306]
[/opt/stackless/lib/python2.6/asyncore.py|recv|362])
Please, have a look at the test I attach, let me know if this behavior
is expected with this type of usage, or else the microsock module
needs some debug.
--Luca
import struct
import sys
sys.path.append('..')
import ntk.lib.rencode as rencode
from ntk.lib.micro import micro, microfunc, allmicro_run, Channel
from ntk.wrap.sock import Sock
import ntk.wrap.xtime as xtime
import ntk.lib.logger as logging
_data_hdr_sz = struct.calcsize("I")
def _data_pack(data):
return struct.pack("I", len(data)) + data
def _data_unpack_from_stream_socket(socket):
readBuffer = ""
while True:
rawPacket = socket.recv(_data_hdr_sz-len(readBuffer))
if not rawPacket:
return ""
readBuffer += rawPacket
if len(readBuffer) == _data_hdr_sz:
dataLength = struct.unpack("I", readBuffer)[0]
readBuffer = ""
while len(readBuffer) != dataLength:
rawPacket = socket.recv(dataLength - len(readBuffer))
if not rawPacket:
return ""
readBuffer += rawPacket
return readBuffer
def stream_request_handler(sock, clientaddr):
while True:
data = _data_unpack_from_stream_socket(sock)
if not data: break
response = rencode.dumps('result ' * 20) # a long string
sock.send(_data_pack(response))
sock.close()
def micro_stream_request_handler(sock, clientaddr):
micro(stream_request_handler, (sock, clientaddr))
def TCPServer(addr=('', 269), sockmodgen=Sock, request_handler=stream_request_handler):
socket=sockmodgen()
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(8)
while 1:
sock, clientaddr = s.accept()
request_handler(sock, clientaddr)
@microfunc(True)
def MicroTCPServer(addr=('', 269), sockmodgen=Sock):
TCPServer(addr, sockmodgen, micro_stream_request_handler)
class TCPClient():
def __init__(self,
host='localhost',
port=269,
sockmodgen=Sock,
xtimemod=xtime):
self.host = host
self.port = port
self.xtime = xtimemod
self.sockfactory = sockmodgen
self.connected = False
def rpc_call(self, args):
while not self.connected:
self.connect()
if not self.connected:
self.xtime.swait(500)
data = rencode.dumps(args)
self.socket.sendall(_data_pack(data))
recv_encoded_data = _data_unpack_from_stream_socket(self.socket)
if not recv_encoded_data:
raise Exception, 'rpc_call: connection closed before reply'
recv_data = rencode.loads(recv_encoded_data)
return recv_data
def connect(self):
socket = self.sockfactory()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.socket.connect((self.host, self.port))
except socket.error, e:
print 'socket.error while connecting ->', e
else:
self.connected = True
def close(self):
self.socket.close()
self.connected = False
def __del__(self):
self.close()
executed = 0
@microfunc(True)
def common_client():
global executed
print theClient.rpc_call('common request ' * 20) # a long string again...
executed += 1
@microfunc(True)
def unique_client():
global executed
a = TCPClient('localhost', 268)
print a.rpc_call('first request ' * 20)
executed += 1
print a.rpc_call('second request ' * 20)
executed += 1
pause = 1 # how many seconds
times = 3 # how many times
print 'start'
MicroTCPServer(('', 269))
print 'started a server'
MicroTCPServer(('', 268))
print 'started another server'
xtime.swait(1000)
print 'after 1 second'
print 'Instantiate common client...'
theClient = TCPClient()
print 'done.'
for i in xrange(times):
print 'launch a unique client'
unique_client()
print 'launch a msg from common client'
common_client()
print 'launch a unique client'
unique_client()
print 'launch a msg from common client'
common_client()
xtime.swait(pause * 1000)
print 'after a wait of ' + str(pause) + ' seconds'
print 'waiting...'
xtime.swait(10000)
print 'after a wait of 10 seconds, executed = ' + str(executed)
_______________________________________________
Netsukuku mailing list
[email protected]
http://lists.dyne.org/mailman/listinfo/netsukuku