Please ignore my previous email. I fixed it and it seems to works
great (attached)
It is very much modeled on wsgiserver.
It fits in 262 lines of code, uses regex and it is a little bit faster
than wsgiserver but,
it does not perform all checks that wsgiserver performs (do we need
them?).
I am still trying to figure out if I am missing something important.
Massimo
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"web2py-users" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/web2py?hl=en
-~----------~----~----~----~------~----~------~--~---
"""
Experimetal multi-threaded web server created by Massimo Di Pierro
License: GPL2
This code would have been possible without CherryPy wsgiserver, a great example of Python web server.
"""
import threading
import socket
import logging
import sys
import Queue
import re
import errno
import rfc822
import cStringIO
import signal
import time
import traceback
import copy
BUF_SIZE=10000
SERVER_NAME = 'web2py'
ACTUAL_SERVER_PROTOCOL='HTTP/1.1' # should this be determined from request?
regex_head = re.compile('^((http|https|HTTP|HTTPS)\://[^/]+)?(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<protocol>\S+)')
regex_header = re.compile('\s*(?P<key>.*?)\s*\:\s*(?P<value>.*?)\s*$')
regex_chunk = re.compile('^(?P<size>\w+)')
class ChunkedReader:
def __init__(self,stream):
self.stream=stream
self.buffer=None
def chunk_read(self):
if not self.buffer or self.buffer.tell()==self.buffer_size:
self.buffer_size=int(regex_chunk.match(self.stream.readline()).group('size'),16)
if self.buffer_size:
self.buffer=cStringIO.StringIO(self.stream.read(self.buffer_size))
def read(self,size):
data=''
while size:
self.chunk_read()
if not self.buffer_size: break
read_size=min(size,self.buffer_size)
data+=self.buffer.read(read_size)
size-=read_size
return data
def readline(self):
data=''
for c in self.read(1):
if not c: break
data+=c
if c=='\n': break
return data
def readlines(self):
yield self.readline()
def errors_numbers(errnames):
return set([getattr(errno, k) for k in errnames if hasattr(errno,k)])
socket_errors_to_ignore = errors_numbers((
"EPIPE",
"EBADF", "WSAEBADF",
"ENOTSOCK", "WSAENOTSOCK",
"ETIMEDOUT", "WSAETIMEDOUT",
"ECONNREFUSED", "WSAECONNREFUSED",
"ECONNRESET", "WSAECONNRESET",
"ECONNABORTED", "WSAECONNABORTED",
"ENETRESET", "WSAENETRESET",
"EHOSTDOWN", "EHOSTUNREACH",
))
class Worker(threading.Thread):
queue=Queue.Queue()
threads=set()
wsgi_apps=[]
server_name=None
min_threads=1
def die(self):
self.threads.remove(self)
return
def run(self):
while True:
(self.client_socket,self.client_address) = self.queue.get()
if not self.client_socket: return self.die()
try:
self.wsgi_file=self.client_socket.makefile('rb',BUF_SIZE)
while True:
self.build_environ(self.wsgi_file,self.client_address)
for data in self.wsgi_apps[0](self.environ,self.start_response):
try:
if self.chunked_response:
self.client_socket.sendall('%x\r\n' % len(data))
self.client_socket.sendall(data)
except socket.error, e:
if e.args[0] not in socket_errors_to_ignore:
raise
if self.chunked_response:
self.client_socket.sendall('0\r\n')
self.wsgi_file.close()
if self.connection!='Keep-Alive':
break
except Exception, e:
logging.error(str(traceback.format_exc()))
self.try_error_response()
self.client_socket.close()
def build_environ(self,input_file,client_address):
first_line=self.wsgi_file.readline()
match = regex_head.match(first_line)
request_method = match.group('method')
uri = match.group('uri')
request_protocol=match.group('protocol')
k = uri.find('?')
if k<0: k = len(uri)
(path_info,query_string)=(uri[:k],uri[k+1:])
self.environ={'wsgi.version':(1,0),
'wsgi.input':self.wsgi_file,
'wsgi.url_encoding':'utf-8',
'wsgi.url_scheme':'http',
'wsgi.errors':sys.stderr,
'ACTUAL_SERVER_PROTOCOL':ACTUAL_SERVER_PROTOCOL,
'CLIENT_ADDR':self.client_address[0],
'CLIENT_PORT':self.client_address[1],
'PATH_INFO':path_info,
'REQUEST_URI':uri,
'REQUEST_METHOD':request_method,
'PATH_INFO': path_info,
'SCRIPT_NAME': '',
'QUERY_STRING': query_string}
for line in self.wsgi_file:
if line=='\r\n': break
match=regex_header.match(line)
if not match: continue
key=match.group('key').upper().replace('-','_')
if isinstance(key,unicode):
key=key.encode('ISO-8859-1')
value=match.group('value')
# commented out for speed up. is this necessary?
# try:
# value = value.decode('utf-8').encode('utf-8')
# except UnicodeDecodeError:
# value = value.decode('ISO-8859-1').encode('utf-8')
self.environ['HTTP_'+key]=value
try:
self.environ['CONTENT_LENGTH']=self.environ['HTTP_CONTENT_LENGTH']
except: pass
try:
self.environ['CONTENT_TYPE']=self.environ['HTTP_CONTENT_TYPE']
except: pass
request_encoding=self.environ.get('HTTP_TRANSFER_ENCODING',None)
if not request_encoding:
self.chunked_request=False
elif request_encoding=='chunked':
self.chunked_request=True
else:
raise Exception, "Not supported"
if request_protocol=='HTTP/1.1' and request_method!='HEAD':
self.chunked_response=True
else:
self.chunked_response=False
self.chunked_request=False
self.chunked_response=False
if self.chunked_request:
self.environ['wsgi.input']=ChunkedReader(self.wsgi_file)
def start_response(self,status,headers):
header_keys = [x.lower() for (x,y) in headers]
if not 'date' in header_keys:
headers.append(('Date',rfc822.formatdate()))
if not 'server' in header_keys:
headers.append(('Server',self.server_name))
self.connection=self.environ.get('HTTP_CONNECTION','close')
headers.append(('Connection',self.connection))
self.response_headers=headers
serialized_headers=''.join(['%s: %s\r\n' % (k,v) for (k,v) in headers])
self.client_socket.sendall("HTTP/1.1 %s\r\n%s\r\n" % (status, serialized_headers))
def try_error_response(self,status="500 INSERTNAL SERVER ERROR"):
try:
self.client_socket.sendall("%s %s\r\nContent-Length: 0\r\nContent-Type: text/plain\r\n\r\n" %
(self.environ['ACTUAL_SERVER_PROTOCOL'],status))
except: pass
class web2pyWSGIServer:
def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
max_threads=-1, request_queue_size=5, timeout=10, shutdown_timeout=5):
if isinstance(bind_addr,str): bind_addr=bind_addr.split(':')
self.address=bind_addr[0]
self.port=bind_addr[1]
self.min_threads=numthreads
self.max_threads=max_threads
self.request_queue_size=request_queue_size
self.timeout=timeout
self.shutdown_timeout=shutdown_timeout
self.ssl_interface=None
self.server_name=server_name or SERVER_NAME
Worker.wsgi_apps.append(wsgi_app)
Worker.server_name=server_name
Worker.min_threads=self.min_threads
Worker.threads.update([Worker() for k in xrange(self.min_threads)])
def start(self):
for thread in Worker.threads: thread.start()
for res in socket.getaddrinfo(self.address, self.port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
try:
af, socktype, proto, canonname, sa = res
self.socket=socket.socket(af, socktype, proto)
break
except:
continue
if not self.socket:
raise IOException # unable to connect
self.socket.bind(sa)
self.socket.listen(self.request_queue_size)
if self.ssl_interface:
self.socket=ssl_interface(self.socket)
try:
while True:
try:
(client_socket,client_address) = self.socket.accept()
except KeyboardInterrupt:
return self.stop()
except Exception,e:
logging.error(str(e))
continue
if hasattr(client_socket,'settimeout'):
client_socket.settimeout(self.timeout)
Worker.queue.put((client_socket,client_address))
if not Worker.queue.empty() and len(Worker.threads)<self.max_threads:
for k in range(self.min_threads):
new_worker=Worker()
Worker.threads.add(new_worker)
new_worker.start()
elif Worker.queue.empty() and len(Worker.threads)>self.min_threads:
for k in range(self.min_threads):
Worker.queue.put((None,None))
except Exception, e:
logging.error(str(traceback.format_exc()))
return self.stop()
def kill(self,status,frame):
sys.exit(1)
def stop(self):
signal.signal(signal.SIGALRM,self.kill)
signal.alarm(self.shutdown_timeout)
threads=copy.copy(Worker.threads)
for thread in threads: Worker.queue.put((None,None))
return
def test_wsgi_app(environ, start_response):
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
# return [open(environ['PATH_INFO'],'rb').read()]
return ['hello world!\n']
#logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.WARN)
if __name__=='__main__':
s=web2pyWSGIServer('127.0.0.1:'+sys.argv[1],test_wsgi_app,min_threads=5,max_threads=10)
print 'starting!'
s.start()