Brian Smith ha scritto:
Manlio Perillo wrote:
Fine with me but there is a *big* problem.

WSGI 2.0 "breaks" support for asynchronous applications (since you can no more send headers in the app iter).

WSGI 1.0 doesn't guarentee that all asynchronous applications will work
either, because it allows the WSGI gateway to wait for and buffer all
the input from the client before even calling the application callable.
And, it doesn't provide a way to read an indefinite stream of input from
the client, which is also problematic.

Anyway, please post a small example of a program that fails to work
because of these proposed changes for WSGI 2.0.

Thanks,
Brian



Attached there are two working examples (I have not committed it yet,
because I'm still testing - there are some problems that I need to solve).


The `curl_client` module is an high level interface to pycurl.

The `nginx-poll-proxy.py` script is an asyncronous WSGI application that
implements an HTTP proxy.

The `nginx-poll-sleep.py` script is a simple asynchronous WSGI
application that get the content of an HTTP resource using poll just to
"sleep" (suspend execution) for a fixed amount of time.


NOTE: I have also added a `ngx.sleep` extension, but I'm going to remove
it since the same behaviour can be obtained with ngx.poll.


An explanation of the interfaces
--------------------------------

The ngx.poll extension is based on the Python stdlib select.poll interface.

There are two constants: `ngx.WSGI_POLLIN` and `ngx.WSGI_POLLOUT`.
These are defined in the WSGI environment, but their value is "know"
(`0x01` and `0x04`) and can be used for bit masking.

The `ngx.connection_wrapper(fd)` function takes as input a file
descriptor (as integer) and returns a Connection wrapper object, to be
used for later operations.


The Connection wrapper object has the following methods:
- fileno():
     return the associated socket descriptor
- register(flags):
     register the connection with the server "reactor";
     flags is a bit mask of ngx.WSGI_POLLIN and ngx.WSGI_POLLOUT
- deregister(flags=None):
     deregister the connection from the server "reactor"
- close:
     close the connection object, deregisterering it from the server
     "reactor" if still active.
     XXX it also can close the socket, but this should be done by the
         client

The last function is `ngx.poll(timeout)`.
When called, the user *should* yield an empty string (yielding a non
empty string will result in an "undefined behaviour").

The WSGI application iteration will be suspended until a connection is
ready for reading or writing, or the timeout expires.

The `ngx.poll` function returns a callable that, when called, returns a
tuple with the connection object "ready" (or None if timedout) and a
flag indicating if the connection is ready for reading or writing.

NOTE: due to the internal architecture of the Nginx event module (it
       have to support several different event systems), mod_wsgi for
       Nginx will only return ngx.WSGI_POLLIN or ngx.WSGI_POLLPUT,
       *never* ngx.WSGI_POLLIN | ngx.WSGI_POLLPUT.

       Also, no error status is reported.



That's all.

An asynchronous application is simply impossible to develope with the
current draft of WSGI 2.0, since I need to send the headers after some
steps in the application iterator.


So, please, don't "ruin" the WSGI specification just to make it more
easy to implement and to use.
For me asynchronous support is very important.


P.S: I have chosen to implement this interface, instead of
      `wsgi.pause_output`, because IMHO it is very easy to implement for
      "normal" servers.

      Moreover it is also more simple to use, with a very "natural"
      interface, and it avoids the use of callbacks and a more strict
      interaction with the server "reactor".


Regards  Manlio Perillo

import time
from cStringIO import StringIO
from wsgiref.headers import Headers

import pycurl


# These constants are not defined in pycurl
CURL_POLL_NONE = 0
CURL_POLL_IN = 1
CURL_POLL_OUT = 2
CURL_POLL_INOUT = 3
CURL_POLL_REMOVE = 4


# These constants are defined in the WSGI environment but their value
# is know
WSGI_POLLIN = 0x01
WSGI_POLLOUT = 0x04


class CurlClient(object):
    """Asyncronous client using pycurl.
    Support only one connection at a time.
    """

    def __init__(self):
        # Create the multi instance
        self.multi = pycurl.CurlMulti()

        self.multi.setopt(pycurl.M_SOCKETFUNCTION, self._socket_callback)
        self.multi.setopt(pycurl.M_TIMERFUNCTION, self._timeout_callback)


    def request(self, environ, url):
        self.connection = environ['ngx.connection_wrapper']
        self.log = environ['wsgi.errors']

        self.timeout = 30000
        self.can_send_headers = False

        self.headers = Headers([])
        self.buf = StringIO()

        # map sockets to connection wrapper objects
        self.connection_map = {}

        # XXX keep the reference to the simple instance
        c = self.add_simple(url)


        self.log.write('bootstrap')
        while 1:
            ret, num_handles = self.multi.socket_all()
            self.log.write('ret: %s, num_handles: %s' % (ret, num_handles))

            if ret != pycurl.E_CALL_MULTI_PERFORM: 
                    break

        self.log.write('starting')
        while num_handles:
            self.log.write('polling')
            state = environ['ngx.poll'](self.timeout)
            yield ''

            conn, flags = state()
            if conn is None:
                # timeout
                # XXX pycurl does not support curl_multi_socket_action
                raise RuntimeError('timeout')
                
            fd = conn.fileno()
            self.log.write('poll state: %s, %s' % (fd, flags))

            while 1:
                # XXX pycurl does not support curl_multi_socket_action
                ret, num_handles = self.multi.socket_all()
                self.log.write('ret: %s, num_handles: %s' % (ret, num_handles))

                if self.can_send_headers:
                    # yield so the caller has a change to send headers
                    # NOTE: this will not be possible with the current
                    # draft of WSGI 2.0

                    yield ''

                if ret != pycurl.E_CALL_MULTI_PERFORM: 
                    break

        # XXX make sure to close all the connections
        for conn in self.connection_map.values():
            conn.close()

    def add_simple(self, url):
        # Add the easy instance
        c = pycurl.Curl()
        c.setopt(pycurl.URL, url)
        c.setopt(pycurl.HEADERFUNCTION, self._header_callback)
        c.setopt(pycurl.WRITEFUNCTION, self._write_callback)

        self.multi.add_handle(c)

        return c


    def _write_callback(self, data):
        # XXX we ignore trailer headers
        if not self.can_send_headers:
            self.can_send_headers = True

        self.log.write('received data: %d bytes' % len(data))
        self.buf.write(data)

    def _header_callback(self, header):
        self.log.write('received header: %s' % header)

        # XXX libcurl returns the end of line
        header = header.rstrip()
        try:
            name, value = header.split(':')
        except ValueError:
            # XXX libcurl returns the status line
            return

        self.headers.add_header(name, value)

    def _socket_callback(self, action, socket, user_data, socket_data):
        self.log.write('socket callback: %d, %d' % (socket, action))

        if action == CURL_POLL_NONE:
            # nothing to do
            return
        elif action == CURL_POLL_IN:
            flags = WSGI_POLLIN
        elif action == CURL_POLL_OUT:
            flags = WSGI_POLLOUT
        elif action == CURL_POLL_INOUT:
            flags = WSGI_POLLIN | WSGI_POLLOUT
        elif action == CURL_POLL_REMOVE:
            conn = self.connection_map[socket]
            # XXX check me: mod_wsgi uses, as default, the flag passed
            #               in conn.register
            conn.unregister()

            return

        conn = self.connection_map.get(socket)
        if conn is None:
            conn = self.connection(socket)
            self.connection_map[socket] = conn

        conn.register(flags)
        
    def _timeout_callback(self, msec):
        self.log.write('timeout: %d' % msec)
        self.timeout = msec

import time
import curl_client



hop_by_hop_headers = ['connection', 'keep-alive',
                      'proxy-authenticate', 'proxy-authorization',
                      'te', 'trailers', 'transfer-encoding',
                      'upgrade']

#PROXY_HOST = "http://localhost:10000/";
PROXY_HOST = "http://freebsd:1234";


def application(environ, start_response):
    client = curl_client.CurlClient()
    log = environ['wsgi.errors']

    headers_sent = False

    # Execute the HTTP request
    url = PROXY_HOST + environ['PATH_INFO']
    for keep in client.request(environ, url):
        if client.can_send_headers and not headers_sent:
            # remove hop by hop headers
            headers = [
                (n, v) for n, v in client.headers.items()
                if n.lower() not in hop_by_hop_headers
                ]

            # NOTE: this will not be possible with the current
            #       draft of WSGI 2.0
            start_response('200 OK', headers)

            headers_sent = True

        yield ''

    n, success, error = client.multi.info_read()
    assert n == 1

    if error:
        headers = [
            ('Server', 'Test-read'),
            ('Content-Type', 'text/plain'),
            ('X-Powered-By', 'Python'),
            ]

        start_response('500 Internal Error', headers)
        yield str(error[0][1:])

        return

    yield client.buf.getvalue()

import time
from cStringIO import StringIO
import pycurl


#URL = "http://localhost:10000/";
URL = "http://freebsd:1234/wsgi-file/usr/local/nginx/buf";


def application(environ, start_response):
    buf = StringIO()

    c = pycurl.Curl()
    c.setopt(pycurl.URL, URL)
    c.setopt(pycurl.WRITEFUNCTION, buf.write)

    m = pycurl.CurlMulti()
    m.add_handle(c)

    log = environ['wsgi.errors']

    log.write('bootstrap')
    while 1:
        ret, num_handles = m.perform()
        log.write('ret: %d, num_handles: %d' % (ret, num_handles))

        if ret != pycurl.E_CALL_MULTI_PERFORM:
            break


    log.write('starting')
    while num_handles:
        log.write('polling')
        state = environ['ngx.poll'](500)
        #time.sleep(0.5)
        yield ''

        conn, flags = state()
        log.write('poll state: %s, %s' % (conn, flags))
        while 1:
            ret, num_handles = m.perform()
            log.write('ret: %d, num_handles: %d' % (ret, num_handles))

            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break


    content_type = c.getinfo(pycurl.CONTENT_TYPE)

    n, success, error = m.info_read()
    if error:
        headers = [
            ('Server', 'Test-read'),
            ('Content-Type', 'text/plain'),
            ('X-Powered-By', 'Python'),
            ]

        start_response('500 Internal Error', headers)
        yield str(error[0][1:])

        return

    headers = [
        ('Server', 'Test-read'),
        ('Content-Type', content_type),
        ('X-Powered-By', 'Python'),
        ]

    start_response('200 OK', headers)
    yield buf.getvalue()

_______________________________________________
Web-SIG mailing list
Web-SIG@python.org
Web SIG: http://www.python.org/sigs/web-sig
Unsubscribe: 
http://mail.python.org/mailman/options/web-sig/archive%40mail-archive.com

Reply via email to