Whoops, forgot to attach the files.  Lets see if this works :)

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"TurboGears" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at http://groups.google.com/group/turbogears
-~----------~----~----~----~------~----~------~--~---
import cherrypy
from cherrypy._cpwsgi import wsgiApp

import wsgiserver
from wsgiserver import HybrideWSGIServer,  fix_thread_local

# Monkeypatch wsgiserver.handle_request to handle cherrypy's threadlocal 
objects.
wsgiserver.handle_request =  
fix_thread_local(cherrypy.serving)(wsgiserver.handle_request)

class CherrypyWSGIServer(HybrideWSGIServer):
    """ A wrapper around WSGIServer to handle configuration."""

    def __init__(self):
        conf = cherrypy.config.get

        bind_addr = (conf("server.socket._host") or '', 
conf("server.socket_port"))
        num_threads = conf("server.thread_pool")
        server_name = conf("server.socket_host")
        backlog = conf("server.socket_queue_size") or 5
        granny = conf("server.event_granularity") or 0.1

        HybrideWSGIServer.__init__(self, bind_addr, wsgiApp, num_threads, 
server_name,
                                   request_queue_size=backlog, 
event_granularity = granny)
                                           
#!C:\Python24\python.exe
import pkg_resources
pkg_resources.require("TurboGears")

import turbogears
import cherrypy
cherrypy.lowercase_api = True

from os.path import *
import sys

# first look on the command line for a desired config file,
# if it's not on the command line, then
# look for setup.py in this directory. If it's not there, this script is
# probably installed
if len(sys.argv) > 1:
    turbogears.update_config(configfile=sys.argv[1], 
        modulename="scribble.config")
elif exists(join(dirname(__file__), "setup.py")):
    turbogears.update_config(configfile="dev.cfg",
        modulename="scribble.config")
else:
    turbogears.update_config(configfile="prod.cfg",
        modulename="scribble.config")

from scribble.controllers import Root
cherrypy.root = Root()

from cpwsgiserver import CherrypyWSGIServer

cherrypy.server.start(initOnly=True, serverClass=CherrypyWSGIServer)

s = CherrypyWSGIServer() 

try:
    s.start()
except:
    s.stop()
    raise
""" A WSGI server that supports simple `continuations`


This is an (experimental) WSGI server that builds on the threaded cherrypy WSGI
server. It adds an option to 'suspend' requests that are not ready to produce
content. A WSGI app can do so by yielding an empty string (''). The server will
use this indicator to tranfer the request from the pre-emetive threadpool into a
'microthread' pool. This frees the pre-emptive thread to work on active 
requests.
As a result the server can handle a resonable number of pending request and 
still
be responsive to active requests.


Example usage:
--------------
The server will keep polling the app after it has moved the request out of the 
pre-emptive threadpool. One can use the `while some_nonblocking_predicate: 
yield '' `
construct to keep the app in a `suspended` state. 

>>> import time
>>> def timeout_after(delay)
>>>     end = time.clock() + delay
>>>     return lambda: (time.clock() < end) 
>>>
>>> def SlowWSGIApp(environ, start_response):
>>>     response_headers = [('Content-type','text/plain')]
>>>     start_response("200 OK", response_headers)
>>>     ready = timeout_after(5)
>>>     while not ready(): yield ''
>>>     yield 'Hello world!'
>>>       
>>> s = HybridWSGIServer(('',8080), SlowWSGIApp, numthreads=1)
>>> s.start()


Note on usage in combination with a frameworks:
-----------------------------------------------
Most web frameworks use threadlocal objects to store requests specific 
information in the framework namespace. The fix_thread_local decorator
makes sure that thread local objects are transfered between threads. E.g,

>>> import cherrypy
>>> handle_request = fix_thread_local(cherrypy.serving)(handle_request)
      
There might also be other threadsafty issues specific to your framework. 


Note on peformance:
-------------------
The goal of this server is to quickly build a `comet` enabled server that 
integrates 
with TurboGears. You could use this server to test higher level abstraction to 
do messages
routing and such. 
 
This server would however, would be a bad solution for public/ potential high 
traffic
services. The pending requests are continuously polled for state changes, thus 
greatly
limiting the maximum number of pending connections it can sustain. E.g, my 
development
notebook can do some 30k yields in 0.1 sec. Given the fact that the app will 
need to
check at least one predicate per connection this would limit the performance at 
15k
connections. Apps also need cpu time to do something usefull, thus actual 
performance
will be much lower. A simple chat application implemented using the TurboGears 
framework
has been tested with 1024 connections resulting in 65% cpu utilization. 

The WSGI 1.0 spec has limited support for async applications. The spec however, 
does allow 
a custom server extenstion-api which could be used to to hook event triggers 
into the  
server eventloop. This would remove the wastefull polling of the whole app 
stack. Furthermore 
when deployed on a potenially slow internet it would be wise to switch from 
threaded  to 
non-blocking or even asynchronous io. So, as one can see, there is plenty of 
room to improve 
performance. Futhermore it would be wise to look into the Twisted framework. 
Twisted has alot
of advanced async IO code which can be leveraged to build high peformance 
servers.


Copyright (c) 2006 Joost Moesker
Published under the MIT License

"""

from copy import copy

from cherrypy import _cpwsgiserver
from cherrypy._cpwsgiserver import *
      
_NO_THREAD, _THREAD = "_not_thread", "_thread"
_SHUTDOWNREQUEST = None

# generic helpers:
def watchdog( func, callback, threshold=0):
    """ Monitor execution time and make a callback if it exceeded the 
threshold."""
    import time
    def wrapper(*arg, **kw):
        start = time.clock()
        results = func(*arg, **kw)
        et = time.clock() - start
        if et > threshold:
           callback({'callable':func, 'execution_time' :et})
        return results
    return wrapper

def fix_thread_local(thread_local):
    """Task decorator that ensures that a thread local moves between threads."""
    def decorator(task):
        def wrapper(*arg, **kw):
            for i in task(*arg, **kw):
                serving = thread_local.__dict__.copy()
                thread_local.__dict__.clear()
                yield i
                thread_local.__dict__.update(serving)
        return wrapper
    return decorator

def handle_request(request):
    """ Drives the request after it has been accepted
    
    This `coroutine` handles the request after it has been accepted by the 
server.
    It controls in how a code section will be executed. Yielding _THREAD will
    make sure that the next code segment will be executed in a pre-emptive
    thread. Yielding _NO_THREAD yields control back to the `microthread` 
scheduler.

    """
    #NOTE: parse.request calls rfile.readline which can block, so requests must 
be started up in a thread
    #yield _THREAD
    request.parse_request()
    
    if not request.ready:
        request.terminate()
        raise StopIteration
   
    response = request.wsgi_app( request.environ, request.start_response)
    
    for data in response:
        if data == '':
            yield _NO_THREAD
        else:
            yield _THREAD
            try:
                request.write(data)
            except (KeyboardInterrupt, SystemExit):
                request.terminate() # lets play nice and close the connection
                raise
            except Exception, e:
                if len(e.args) and e.args[0] in socket_errors_to_ignore: break
                traceback.print_exc()
                break
            
    if hasattr(response, "close"):
        response.close()
    request.terminate()

       
class WorkerThread(threading.Thread):
    """A worker thread that dispatches a task depending on its 'state'. """
    def __init__(self, server):
        self.ready = False
        self.server = server
        self.task_queue = server.task_queue
        self.task_list = server.task_list
        threading.Thread.__init__(self)
    
    def run(self):
        self.ready = True
        task_list, task_queue = self.task_list, self.task_queue
        while True:
            task = self.task_queue.get()
            if task is _SHUTDOWNREQUEST:
                return
            try:
                state = task.next()
                # Dispatch the next step:    
                if state is _THREAD: task_queue.put(task) # XXX: seems 
wastefull why not continue in the same thread...
                if state is _NO_THREAD: task_list.append(task)
            except StopIteration:
                if task in task_list: task_list.remove(task)
            except (KeyboardInterrupt, SystemExit), exc:
                self.server.interrupt = exc
#XXX: monkey patch the orginial WorkerThread class in _cpwsgisever
_cpwsgiserver.WorkerThread = WorkerThread


class HybrideWSGIServer(CherryPyWSGIServer):
    """ WSGI server that can handle longlived HTTP connections.

   Important parameters:

    * ``bind_addr`` - A (hostname, port) tuple on which the server should 
listen.

    * ``wsgi_app``  - The WSGI Application e.g, cherrypy._cpwsgi.wsgiApp

    * ``numthreads`` - Numer of worker threads in the threadpool (default=10)

    * ``server_name`` - Name of the server (default=None)

    * ``max`` - Size of the thread pool queue size (default=-1, unlimited)

    * ``request_queue`` - Maximum backlog on the listing socket (default=5)

    * ``timeout`` - Default socket timeout (default=10)

    * ``event_granularity`` - Time between application state checks 
(default=0.1)
    
    """
    
    ready = False
    interrupt = None
    ProtocolHandlerClass = HTTPRequest
    
    def __init__(self, bind_addr, wsgi_app, numthreads=10, server_name=None,
                 max=-1, request_queue_size=5, timeout=10, 
event_granularity=0.1):
        """Be careful w/ max."""
        CherryPyWSGIServer.__init__(self, bind_addr, wsgi_app, numthreads=10,
                                    server_name=None, max=-1, 
request_queue_size=5,
                                    timeout=10 )
        
        self.task_list = []
        self.task_queue = self.requests
        self.event_granularity = event_granularity
    
    def start(self):
        """Run the server forever."""
        # Create a thread that runs the dispatch loop:
        t = threading.Thread(target=self.dispatch_loop, 
args=[self.event_granularity])
        t.start()

        CherryPyWSGIServer.start(self)
        
    def tick(self):
        """Accepts incomming connections and dispatch a task to the 
task_queue"""
        try:
            s, addr = self.socket.accept() #seems to be blocking...
            if hasattr(s, 'settimeout'):
                s.settimeout(self.timeout)
            request = self.ProtocolHandlerClass(s, addr, self)
            task = handle_request( request)
            self.task_queue.put(task) # start out in a thread
        except socket.timeout:
                # The only reason for the timeout in start() is so we can
                # notice keyboard interrupts on Win32, which don't interrupt
                # accept() by default
            return
        except (KeyboardInterrupt, SystemExit), exc:
            self.interrupt = exc
            

    def dispatch_loop(self, granularity=0.1):
        """ Polling loop to handle pending requests."""
        task_list, task_queue = self.task_list, self.task_queue
        try:
            while True:
                start = time.clock()
                task_list_copy = copy(self.task_list) # workerthreads can 
modify the tasklist concurently, so we make shallow copy
                for task in task_list_copy:
                    if task is _SHUTDOWNREQUEST:
                        return
                    try:
                        state = task.next()
                        # Dispatch the next step:    
                        if state is _THREAD:
                            task_list.remove(task)
                            task_queue.put(task)
                        #elif _NO_THREAD: pass      
                    except StopIteration:
                        if task in task_list: task_list.remove(task)
                        
                delay = time.clock() - start
                time.sleep( max( (granularity - delay), 0))
        except (KeyboardInterrupt, SystemExit):
            self.interrupt = exc

    def stop(self):
        """Gracefully shutdown a server that is serving forever."""
        self.task_list.append(_SHUTDOWNREQUEST)
        CherryPyWSGIServer.stop(self)

Reply via email to