-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Nicolas Cannasse a écrit :
>>> Another possibility would be to have an abstract protocol API that is
>>> implemented differently in mod_tora.c (or lighty_tora.c).
>>>
>>> Nicolas
>>>
>>>
>>
>>
>> This is a tora proxy backend implementation for
>> lighttpd 1.5.0 >= r1992 :)
>>
>> lighttpd proxy layer allow load balancing & failover for free, that's
>> why I've use it instead of a standalone module.

I've optimized the lighttpd patch, I've remove some duplicate buffers
and now directly use temp files, updated version attached.

It's still parse all multipart data from temp files to find boundaries
positions before starting to send anything to tora, which can be quite
long, so timeouts can occurs with (very) large uploads.

You set the tora socket timeout to 3 seconds, it is possible to increase
it by default without side effects ? or having a command line option for
it ?

> 
> Thanks, I'll have a look at integrating that in mod_tora. I'll maybe
> refactor a big chunk of protocol.* in order to separate the apache parts
> from the other ones.

Great, tell me when done then I'll update the lighttpd patch.

Do you think we should create a page on the haxe wiki for this patch
now, or is it better to wait for protocol.* refactoring ?

Kaalh

> 
> Best,
> Nicolas
> 



-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.6 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iD8DBQFI6f/fWRw620CDc80RAtg7AJ9/zPvIvQ99c76RF07eRGUewcSsBwCgoYAZ
1f3fDT/TWK6uNN/8j1jgxg8=
=OoZK
-----END PGP SIGNATURE-----
diff -ru lighttpd-1.5.0/configure.in lighttpd/configure.in
--- lighttpd-1.5.0/configure.in	2007-09-02 21:46:03.000000000 +0200
+++ lighttpd/configure.in	2008-09-29 20:35:05.000000000 +0200
@@ -632,6 +632,7 @@
 	mod_proxy_backend_fastcgi \
 	mod_proxy_backend_scgi \
 	mod_proxy_backend_ajp13 \
+	mod_proxy_backend_tora \
 	mod_rrdtool \
 	mod_secdownload \
 	mod_setenv \
diff -ru lighttpd-1.5.0/src/Makefile.am lighttpd/src/Makefile.am
--- lighttpd-1.5.0/src/Makefile.am	2007-05-08 10:31:21.000000000 +0200
+++ lighttpd/src/Makefile.am	2008-09-29 20:32:10.000000000 +0200
@@ -243,6 +243,10 @@
 mod_proxy_backend_ajp13_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
 mod_proxy_backend_ajp13_la_LIBADD = $(common_libadd) $(PCRE_LIB)
 
+lib_LTLIBRARIES += mod_proxy_backend_tora.la
+mod_proxy_backend_tora_la_SOURCES = mod_proxy_backend_tora.c
+mod_proxy_backend_tora_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
+mod_proxy_backend_tora_la_LIBADD = $(common_libadd) $(PCRE_LIB)
 
 lib_LTLIBRARIES += mod_ssi.la
 mod_ssi_la_SOURCES = mod_ssi_exprparser.c mod_ssi_expr.c mod_ssi.c diff -Pru lighttpd-1.5.0/src/mod_proxy_backend_tora.c lighttpd/src/mod_proxy_backend_tora.c
diff -Pru lighttpd-1.5.0/src/mod_proxy_backend_tora.c lighttpd/src/mod_proxy_backend_tora.c
--- lighttpd-1.5.0/src/mod_proxy_backend_tora.c	1970-01-01 01:00:00.000000000 +0100
+++ lighttpd/src/mod_proxy_backend_tora.c	2008-10-06 11:09:03.000000000 +0200
@@ -0,0 +1,826 @@
+/*
+ * Copyright (c) 2008, KaalH! <[EMAIL PROTECTED]>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of the <organization> nor the
+ *       names of its contributors may be used to endorse or promote products
+ *       derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <copyright holder> ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL <copyright holder> BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include "sys-files.h"
+
+#ifdef HAVE_SYS_FILIO_H
+# include <sys/filio.h>
+#endif
+
+#include "mod_proxy_core.h"
+#include "mod_proxy_core_protocol.h"
+#include "configfile.h"
+#include "buffer.h"
+#include "log.h"
+#include "sys-strings.h"
+#include "inet_ntop_cache.h"
+
+#define CORE_PLUGIN "mod_proxy_core"
+
+#define TORA_HEADER_LEN 4
+
+#define BUFSIZE (1 << 16)
+
+#define PARSE_HEADER(start,cursor) \
+        cursor = start; \
+        if( *cursor == '"' ) { \
+                start++; \
+                cursor++; \
+                while( *cursor != '"' && *cursor != 0 ) \
+                        cursor++; \
+        } else { \
+                while( *cursor != 0 && *cursor != '\r' && *cursor != '\n' && *cursor != '\t' ) \
+                        cursor++; \
+        }
+
+typedef struct {
+	PLUGIN_DATA;
+
+	proxy_protocol *protocol;
+} protocol_plugin_data;
+
+typedef struct {
+	size_t   len;
+	off_t    offset;
+	int      type;
+} tora_packet;
+
+typedef struct {
+	size_t   len;
+	off_t    offset;
+	buffer   *buf;
+	buffer   *boundary;
+	chunkqueue *tmp;
+	int		name_sent;
+} multipart;
+
+/**
+ * The tora protocol decoder will use this struct for storing state variables
+ * used in decoding the stream
+ */
+typedef struct {
+	buffer        *buf;      /* holds raw header bytes or used to buffer STDERR */
+	off_t         offset;    /* parse offset into buffer. */
+	tora_packet  packet;    /* parsed info about current packet. */
+	size_t        chunk_len; /* chunk length */
+	size_t	requested_bytes; /* backend requested we send this many bytes of the request content. */
+	buffer        *hdkey;      /* temporary holds header key */
+	multipart		multipart;	/* holds multipart data */
+} tora_state_data;
+
+typedef enum {
+        CODE_FILE = 1,
+        CODE_URI,
+        CODE_CLIENT_IP,
+        CODE_GET_PARAMS,
+        CODE_POST_DATA,
+        CODE_HEADER_KEY,
+        CODE_HEADER_VALUE,
+        CODE_HEADER_ADD_VALUE,
+        CODE_PARAM_KEY,
+        CODE_PARAM_VALUE,
+        CODE_HOST_NAME,
+        CODE_HTTP_METHOD,
+        CODE_EXECUTE,
+        CODE_ERROR,
+        CODE_PRINT,
+        CODE_LOG,
+        CODE_FLUSH,
+        CODE_REDIRECT,
+        CODE_RETURNCODE,
+        CODE_QUERY_MULTIPART,
+        CODE_PART_FILENAME,
+        CODE_PART_KEY,
+        CODE_PART_DATA,
+        CODE_PART_DONE,
+        CODE_TEST_CONNECT,
+} proto_code;
+
+typedef struct {
+	unsigned char type;
+	unsigned char lengthB0;
+	unsigned char lengthB1;
+	unsigned char lengthB2;
+} TORA_Header;
+
+
+void tora_encode_header(buffer *b, proto_code code,int len ) {
+	unsigned char h[4];
+        h[0] = (unsigned char)code;
+        h[1] = (unsigned char)len;
+        h[2] = (unsigned char)(len >> 8);
+        h[3] = (unsigned char)(len >> 16);
+	buffer_append_string_len(b,(char*)h,4);
+}
+void tora_encode_size(buffer *b, proto_code code, const char *str, int len ) {
+	tora_encode_header(b,code,len);
+	buffer_append_string_len(b,str,len);
+}
+void tora_encode(buffer *b, proto_code code, const char *str ) {
+	tora_encode_size(b,code,str,(int)strlen(str));
+}
+
+static char *memfind( char *mem, int mlen, const char *v ) {
+        char *found;
+        int len = (int)strlen(v);
+        if( len == 0 )
+                return mem;
+        while( (found = memchr(mem,*v,mlen)) != NULL ) {
+                if( (int)(found - mem) + len > mlen )
+                        break;
+                if( memcmp(found,v,len) == 0 )
+                        return found;
+                mlen -= (int)(found - mem + 1);
+                mem = found + 1;
+        }
+        return NULL;
+}
+
+int url_decode( const char *bin, int len, char *bout ) {
+        int pin = 0;
+        int pout = 0;
+        while( len-- > 0 ) {
+                char c = bin[pin++];
+                if( c == '+' )
+                        c = ' ';
+                else if( c == '%' ) {
+                        int p1, p2;
+                        if( len < 2 )
+                                break;
+                        p1 = bin[pin++];
+                        p2 = bin[pin++];
+                        len -= 2;
+                        if( p1 >= '0' && p1 <= '9' )
+                                p1 -= '0';
+                        else if( p1 >= 'a' && p1 <= 'f' )
+                                p1 -= 'a' - 10;
+                        else if( p1 >= 'A' && p1 <= 'F' )
+                                p1 -= 'A' - 10;
+                        else
+                                continue;
+                        if( p2 >= '0' && p2 <= '9' )
+                                p2 -= '0';
+                        else if( p2 >= 'a' && p2 <= 'f' )
+                                p2 -= 'a' - 10;
+                        else if( p2 >= 'A' && p2 <= 'F' )
+                                p2 -= 'A' - 10;
+                        else
+                                continue;
+                        c = (char)((unsigned char)((p1 << 4) + p2));
+                }
+                bout[pout++] = c;
+        }
+        bout[pout] = 0;
+        return pout;
+}
+
+#define DEFAULT_SIZE    256
+
+static void tora_param_encode( buffer *b, proto_code code, const char *str, int len ) {
+        char tmp[DEFAULT_SIZE];
+        char *buf = NULL;
+        int size;
+        if( len >= DEFAULT_SIZE )
+                buf = malloc(len+1);
+        size = url_decode(str,len,buf?buf:tmp);
+        tora_encode_size(b,code,buf?buf:tmp,size);
+        if( buf )
+                free(buf);
+}
+
+void tora_parse_params(buffer *b, buffer *r) {
+	char *aand, *aeq, *asep;
+	char *args = BUF_STR(b);
+
+        while( TRUE ) {
+                aand = strchr(args,'&');
+                if( aand == NULL ) {
+                        asep = strchr(args,';');
+                        aand = asep;
+                } else {
+                        asep = strchr(args,';');
+                        if( asep != NULL && asep < aand )
+                                aand = asep;
+                }
+                if( aand != NULL )
+                        *aand = 0;
+                aeq = strchr(args,'=');
+                if( aeq != NULL ) {
+                        *aeq = 0;
+                        tora_param_encode(r,CODE_PARAM_KEY,args,(int)(aeq-args));
+                        tora_param_encode(r,CODE_PARAM_VALUE,aeq+1,(int)strlen(aeq+1));
+                        *aeq = '=';
+                }
+                if( aand == NULL )
+                        break;
+                *aand = (aand == asep)?';':'&';
+                args = aand+1;
+        }
+}
+
+tora_state_data *tora_state_data_init(void) {
+	tora_state_data *data;
+
+	data = calloc(1, sizeof(*data));
+	data->buf = buffer_init();
+	data->hdkey = buffer_init();
+	data->multipart.buf = buffer_init();
+	data->multipart.boundary = buffer_init();
+	data->multipart.tmp = chunkqueue_init();
+	return data;
+}
+
+PROXY_CONNECTION_FUNC(proxy_tora_init) {
+
+	UNUSED(srv);
+
+	if(!proxy_con->protocol_data) {
+		proxy_con->protocol_data = tora_state_data_init();
+	}
+	return 1;
+}
+
+void tora_state_data_free(tora_state_data *data) {
+	buffer_free(data->buf);
+	buffer_free(data->hdkey);
+	buffer_free(data->multipart.boundary);
+	buffer_free(data->multipart.buf);
+	chunkqueue_free(data->multipart.tmp);
+	free(data);
+}
+
+void tora_state_data_reset(tora_state_data *data) {
+	buffer_reset(data->buf);
+	buffer_reset(data->multipart.buf);
+	buffer_reset(data->multipart.boundary);
+	chunkqueue_reset(data->multipart.tmp);
+	data->packet.len = 0;
+	data->packet.offset = 0;
+	data->packet.type = 0;
+	data->offset = 0;
+	data->chunk_len = 0;
+	data->multipart.offset = 0;
+	data->multipart.name_sent = 0;
+}
+
+PROXY_CONNECTION_FUNC(proxy_tora_cleanup) {
+
+	UNUSED(srv);
+
+	if(proxy_con->protocol_data) {
+		tora_state_data_free((tora_state_data *)proxy_con->protocol_data);
+		proxy_con->protocol_data = NULL;
+	}
+	return 1;
+}
+
+/*
+ * copy len bytes from chunk-chain into buffer
+ */
+static int proxy_tora_fill_buffer(tora_state_data *data, chunkqueue *in, size_t len) {
+	off_t we_have = 0, we_need = len;
+	chunk *c;
+
+	buffer_prepare_append(data->buf, we_need);
+	for (c = in->first; c && we_need > 0; c = c->next) {
+		if(c->mem->used == 0) continue;
+		we_have = c->mem->used - c->offset - 1;
+		if (we_have == 0) continue;
+		if (we_have > we_need) we_have = we_need;
+		buffer_append_string_len(data->buf, c->mem->ptr + c->offset, we_have);
+		data->packet.offset += we_have;
+		c->offset += we_have;
+		in->bytes_out += we_have;
+		we_need -= we_have;
+	}
+	return we_need;
+}
+
+
+/*
+ * copy len bytes from chunk-chain into buffer
+ */
+static int proxy_tora_fill_mbuffer(tora_state_data *data, chunkqueue *in, size_t len) {
+	off_t we_have = 0, we_need = len;
+	chunk *c;
+
+	for (c = in->first; c && we_need > 0; c = c->next) {
+		if(c->type == FILE_CHUNK) {
+			we_have = c->file.length - c->offset;
+			if (we_have > we_need) we_have = we_need;
+			if (c->file.mmap.start == MAP_FAILED) {
+				if (-1 == c->file.fd &&  /* open the file if not already open */
+				    -1 == (c->file.fd = open(c->file.name->ptr, O_RDONLY | O_BINARY))) {
+					ERROR("open(%s) failed: %s", BUF_STR(c->file.name), strerror(errno));
+					return -2;
+				}
+				if (MAP_FAILED == (c->file.mmap.start = mmap(0, c->file.length, PROT_READ, MAP_SHARED, c->file.fd, 0))) {
+					ERROR("mmap(%s) failed: %s", BUF_STR(c->file.name), strerror(errno));
+					return -2;
+				}
+				close(c->file.fd);
+				c->file.fd = -1;
+				c->file.mmap.length = c->file.length;
+			}
+			buffer_append_string_len(data->multipart.buf, c->file.mmap.start + c->offset, we_have);
+		} else {
+			if(c->mem->used == 0) continue;
+			we_have = c->mem->used - c->offset - 1;
+			if (we_have == 0) continue;
+			if (we_have > we_need) we_have = we_need;
+			buffer_append_string_len(data->multipart.buf, c->mem->ptr + c->offset, we_have);
+			c->offset += we_have;
+			in->bytes_out += we_have;
+		}
+		we_need -= we_have;
+	}
+	return we_need;
+}
+
+
+static int proxy_tora_stream_encoder_multipart (proxy_session *sess, unsigned int bufsize) {
+	proxy_connection *proxy_con = sess->proxy_con;
+	chunkqueue *in =  sess->remote_con->recv;
+	chunkqueue *out = proxy_con->send;
+	tora_state_data *data = (tora_state_data *)proxy_con->protocol_data;
+	size_t we_need = 0;
+	buffer *buf;
+	char *name, *end_name, *filename = NULL, *end_file_name = NULL, *body = NULL;
+	int len = 0;
+	char *boundstr = NULL;
+	unsigned int boundstr_len;
+	const char *boundary, *bend;
+
+	if(buffer_is_empty(data->multipart.boundary) ) {
+
+		data_string *ds;
+
+	    ds = (data_string *) array_get_element(sess->remote_con->request.headers, CONST_STR_LEN("Content-Type"));
+		if( (boundary = strstr(BUF_STR(ds->value),"boundary=")) == NULL ) {
+			return HANDLER_FINISHED;
+		}
+		boundary += 9;
+		PARSE_HEADER(boundary,bend);
+		len = (int)(bend - boundary);
+		boundstr_len = len + 2;
+		if( boundstr_len > bufsize / 2 )
+			return HANDLER_GO_ON;
+		boundstr = (char*)malloc(boundstr_len + 1);
+		boundstr[0] = '-';
+		boundstr[1] = '-';
+		boundstr[boundstr_len] = 0;
+		memcpy(boundstr+2,boundary,len);
+		buffer_copy_string(data->multipart.boundary,boundstr);
+		free(boundstr);
+	}
+
+	if(data->multipart.buf->used < bufsize) {
+		we_need = proxy_tora_fill_mbuffer(data, in, bufsize - data->multipart.buf->used);
+	}
+
+	if(data->multipart.name_sent == 0) {
+		/* send name & filename */
+		buf = chunkqueue_get_append_buffer(out);
+		len = data->multipart.buf->used;
+		name = memfind(data->multipart.buf->ptr  + data->multipart.offset ,len,"Content-Disposition:");
+		if( name == NULL ) {
+			ERROR("%s","no content disposition in multipart data");
+			chunkqueue_remove_finished_chunks(in);
+			buffer_reset(data->multipart.buf);
+			return HANDLER_FINISHED;
+		}
+		name = memfind(name, len - (int)(name - data->multipart.buf->ptr),"name=");
+		if( name == NULL ) {
+			ERROR("%s","no content name in multipart data");
+			chunkqueue_remove_finished_chunks(in);
+			buffer_reset(data->multipart.buf);
+			return HANDLER_GO_ON;
+		}
+		name += 5;
+		PARSE_HEADER(name,end_name);
+		body = memfind(end_name, len - (int)(end_name - data->multipart.buf->ptr),"\r\n\r\n");
+		if( body == NULL ) {
+			ERROR("%s","corrupted multipart data");
+			chunkqueue_remove_finished_chunks(in);
+			buffer_reset(data->multipart.buf);
+			return HANDLER_GO_ON;
+		}
+		filename = memfind(name,(int)(body - name),"filename=");
+		if( filename != NULL ) {
+				filename += 9;
+				PARSE_HEADER(filename,end_file_name);
+		}
+		body += 4;
+		data->multipart.offset = (int)(body - data->multipart.buf->ptr);
+		if( filename )
+			tora_encode_size(buf,CODE_PART_FILENAME, filename,(int)(end_file_name - filename));
+		tora_encode_size(buf,CODE_PART_KEY, name,(int)(end_name - name));
+     	out->bytes_in += buf->used - 1;
+		data->multipart.name_sent = 1;
+
+		return HANDLER_GO_ON;
+	}
+
+	buf = buffer_init();
+
+	boundary = memfind(data->multipart.buf->ptr  + data->multipart.offset ,data->multipart.buf->used - data->multipart.offset,BUF_STR(data->multipart.boundary));
+	if(boundary == NULL) {
+		tora_encode_header(buf,CODE_PART_DATA,data->multipart.buf->used-1 - data->multipart.offset);
+		chunkqueue_append_buffer(out,buf);
+		buffer_free(buf);
+
+		chunk *c = in->first;
+		c->offset += data->multipart.offset;
+		chunkqueue_steal_chunks_len(out,c,data->multipart.buf->used-1 - data->multipart.offset);
+		out->bytes_in += data->multipart.buf->used-1 - data->multipart.offset;
+		in->bytes_out += data->multipart.buf->used-1 - data->multipart.offset;
+		if(chunk_is_done(c))
+			munmap(c->file.mmap.start,c->file.mmap.length);
+		chunkqueue_remove_finished_chunks(in);
+
+		buffer_reset(data->multipart.buf);
+		data->multipart.offset = 0;
+		return HANDLER_GO_ON;
+	} else {
+		tora_encode_size(buf,CODE_PART_DATA, data->multipart.buf->ptr + data->multipart.offset,(int)(boundary - data->multipart.buf->ptr) - data->multipart.offset - 2);
+		tora_encode_size(buf,CODE_PART_DONE,NULL,0);
+		out->bytes_in += buf->used -1;
+		chunkqueue_append_buffer(out,buf);
+		buffer_free(buf);
+
+		data->multipart.offset = (int)(boundary - data->multipart.buf->ptr) + strlen(BUF_STR(data->multipart.boundary));
+		data->multipart.name_sent = 0;
+
+		if(strcmp(data->multipart.buf->ptr + data->multipart.offset ,"--\r\n") == 0 ) {
+			buffer_reset(data->multipart.buf);
+			buffer_reset(data->multipart.boundary);
+			return HANDLER_FINISHED;
+		}
+	}
+
+	return HANDLER_GO_ON;
+}
+
+PROXY_STREAM_DECODER_FUNC(proxy_tora_stream_decoder_internal) {
+	proxy_connection *proxy_con = sess->proxy_con;
+	chunkqueue *in = proxy_con->recv;
+	tora_state_data *data = (tora_state_data *)proxy_con->protocol_data;
+	size_t we_have = 0, we_need = 0;
+	TORA_Header *header;
+	handler_t rc = HANDLER_GO_ON;
+	char buf[32];
+	int res;
+
+	UNUSED(srv);
+
+	if ((in->bytes_in == in->bytes_out) && in->is_closed) {
+		/* everything got passed through,
+		 *
+		 * as we usually have a FIN packet we should expect to get a is_closed within the
+		 * tora stream. Looks like the remote side crashed
+		 *  */
+		out->is_closed = 1;
+
+		TRACE("%ld / %ld -> %d",
+				in->bytes_in, in->bytes_out,
+				in->is_closed);
+
+		ERROR("looks like the tora backend (%s) terminated before it sent a FIN packet", BUF_STR(sess->request_uri));
+
+		return HANDLER_FINISHED;
+	}
+
+	/* no data ? */
+	if (!in->first) return HANDLER_GO_ON;
+
+	if(data->packet.type == 0) {
+		we_need = (TORA_HEADER_LEN - data->packet.offset);
+		we_need = proxy_tora_fill_buffer(data, in, we_need);
+		if(we_need > 0) {
+			/* we need more data to parse the header. */
+			chunkqueue_remove_finished_chunks(in);
+			return HANDLER_GO_ON;
+		}
+		header = (TORA_Header *)(data->buf->ptr);
+		data->packet.type = header->type;
+		data->packet.len = ((header->lengthB0 | header->lengthB1 << 8) | header->lengthB2 << 16);
+		data->packet.offset = 0;
+
+		/* Finished parsing raw header bytes. */
+		buffer_reset(data->buf);
+	}
+
+	/* proccess the packet's contents. */
+	we_need = data->packet.len - data->packet.offset;
+
+	/* for most packet types copy the content into the data buffer */
+	if (data->packet.type != CODE_PRINT) {
+		if(we_need > 0) {
+			/* copy tora packet contents to buffer */
+			we_need = proxy_tora_fill_buffer(data, in, we_need);
+			/* make sure we have the full tora packet content. */
+			if(we_need > 0) {
+				chunkqueue_remove_finished_chunks(in);
+				return HANDLER_GO_ON;
+			}
+		}
+	}
+
+	switch(data->packet.type) {
+		case CODE_EXECUTE:
+			data->packet.offset = 0;
+			in->is_closed = 1;
+			out->is_closed = 1;
+			sess->is_request_finished = 1;
+			proxy_con->send->is_closed = 1;
+			rc = HANDLER_FINISHED;
+			break;
+		case CODE_HEADER_KEY:
+			buffer_copy_string_buffer(data->hdkey,data->buf);
+			break;
+		case CODE_HEADER_VALUE:
+			array_append_key_value(sess->resp->headers, CONST_BUF_LEN(data->hdkey), CONST_BUF_LEN(data->buf));
+			break;
+		case CODE_HEADER_ADD_VALUE:
+			array_append_key_value(sess->resp->headers, CONST_BUF_LEN(data->hdkey), CONST_BUF_LEN(data->buf));
+			break;
+		case CODE_PRINT:
+			LI_ltostr(buf, we_need);
+			array_append_key_value(sess->resp->headers, CONST_STR_LEN("Content-Length"), buf, strlen(buf));
+			if(sess->have_response_headers < 1) {
+				if(sess->resp->status == -1) {
+					sess->resp->status = 200;
+				}
+				if(array_get_element(sess->resp->headers, CONST_STR_LEN("Content-Type")) == NULL) {
+					array_append_key_value(sess->resp->headers, CONST_STR_LEN("Content-Type"),CONST_STR_LEN("text/html"));
+				}
+				sess->have_response_headers = 1;
+			}
+			we_have = chunkqueue_steal_chunks_len(out, in->first, we_need);
+			we_need -= we_have;
+			in->bytes_out += we_have;
+			out->bytes_in += we_have;
+			break;
+		case CODE_LOG:
+			TRACE("[mod_tora] %s", BUF_STR(data->buf));
+			break;
+		case CODE_ERROR:
+			TRACE("[mod_tora] Error: %s", BUF_STR(data->buf));
+			rc = HANDLER_ERROR;
+			break;
+		case CODE_FLUSH:
+			/* TODO: how does lighty flush output ??? */
+			TRACE("[mod_tora] NOTICE: %s","flush not implemented");
+			break;
+		case CODE_REDIRECT:
+			array_append_key_value(sess->resp->headers, CONST_STR_LEN("Location"), CONST_BUF_LEN(data->buf));
+			sess->resp->status = 301;
+			break;
+		case CODE_RETURNCODE:
+			sess->resp->status = atoi(BUF_STR(data->buf));
+			break;
+		case CODE_QUERY_MULTIPART:
+			chunkqueue_reset(proxy_con->send);
+			do {
+				/* decode the packet */
+				res = proxy_tora_stream_encoder_multipart(sess,atoi(BUF_STR(data->buf)));
+			} while (proxy_con->recv->first && res == HANDLER_GO_ON);
+			rc = res;
+			break;
+		default:
+			TRACE("[mod_tora] %s","Unexpected code");
+			rc = HANDLER_ERROR;
+			break;
+	}
+
+	if(we_need == 0) {
+		/* packet finished, reset state for next packet */
+		tora_state_data_reset(data);
+	}
+
+	chunkqueue_remove_finished_chunks(in);
+
+	return rc;
+}
+
+PROXY_STREAM_DECODER_FUNC(proxy_tora_stream_decoder) {
+	proxy_connection *proxy_con = sess->proxy_con;
+	chunkqueue *in = proxy_con->recv;
+	int res;
+
+	if(out->is_closed) return 1;
+	/* decode the whole packet stream */
+	do {
+		/* decode the packet */
+		res = proxy_tora_stream_decoder_internal(srv, sess, out);
+	} while (in->first && res == HANDLER_GO_ON);
+
+	return res;
+}
+
+
+/**
+ * transform the content-stream into a valid TORA content-stream
+ *
+ * as we don't apply chunked-encoding here, pass it on AS IS
+ */
+PROXY_STREAM_ENCODER_FUNC(proxy_tora_stream_encoder) {
+	proxy_connection *proxy_con = sess->proxy_con;
+	connection *con = sess->remote_con;
+	chunkqueue *out = proxy_con->send;
+	handler_t rc = HANDLER_GO_ON;
+	size_t we_have = 0;
+	buffer *b, *bhm = buffer_init();
+	chunk *c;
+
+	UNUSED(srv);
+
+	/* output queue closed, can't encode any more data. */
+	if(out->is_closed) return HANDLER_FINISHED;
+
+	b = chunkqueue_get_append_buffer(out);
+
+	buffer_copy_string(bhm,get_http_method_name(con->request.http_method));
+
+    if (buffer_is_empty(con->uri.query)) {
+    	char *qstr = malloc(con->request.uri->used + 1);
+    	if (NULL != (qstr = strchr(con->request.uri->ptr, '?'))) {
+			/** extract query string from request.uri */
+    		buffer_copy_string(con->uri.query, qstr + 1);
+    	}
+    	free(qstr);
+	}
+    if (!buffer_is_empty(con->uri.query)) {
+		tora_encode(b,CODE_GET_PARAMS, BUF_STR(con->uri.query));
+		tora_parse_params(con->uri.query,b);
+	}
+	if (strcmp(bhm->ptr,"POST") == 0) {
+
+		unsigned int is_multipart = 0;
+		data_string *ds;
+
+        ds =  (data_string *) array_get_element(con->request.headers, CONST_STR_LEN("Content-Type"));
+		if(strstr(BUF_STR(ds->value),"multipart/form-data"))
+			is_multipart = 1;
+
+		if(is_multipart == 0) {
+			if(chunkqueue_length(in)> BUFSIZE) {
+				ERROR("[mod_tora] %s","Maximum POST data exceeded. Try using multipart encoding");
+			} else {
+
+				chunkqueue *tmp = chunkqueue_init();
+				buffer *r = buffer_init();
+
+				we_have = chunkqueue_steal_all_chunks(tmp, in);
+				in->bytes_out += we_have;
+				tmp->bytes_in += we_have;
+
+				for (c = tmp->first; c; c = c->next) {
+					buffer_append_string(r,c->mem->ptr + c->offset);
+				}
+
+				chunkqueue_free(tmp);
+
+				tora_encode(b,CODE_HTTP_METHOD, bhm->ptr);
+				tora_encode(b,CODE_POST_DATA, BUF_STR(r));
+				tora_parse_params(r,b);
+
+				buffer_free(r);
+			}
+		} else {
+			tora_encode_size(b,CODE_EXECUTE, NULL,0);
+			rc = HANDLER_FINISHED;
+		}
+
+	} else {
+		tora_encode(b,CODE_HTTP_METHOD, bhm->ptr);
+	}
+
+	buffer_free(bhm);
+
+	if (in->bytes_in == in->bytes_out && in->is_closed) {
+		out->is_closed = 1;
+		tora_encode_size(b,CODE_EXECUTE, NULL,0);
+		return HANDLER_FINISHED;
+	}
+
+	return rc;
+}
+
+/**
+ * generate a HTTP/1.1 proxy request from the set of request-headers
+ *
+ */
+PROXY_STREAM_ENCODER_FUNC(proxy_tora_encode_request_headers) {
+	proxy_connection *proxy_con = sess->proxy_con;
+	chunkqueue *out = proxy_con->send;
+	connection *con = sess->remote_con;
+	buffer *b;
+	unsigned int i;
+
+	//UNUSED(srv);
+	UNUSED(in);
+
+	b = chunkqueue_get_append_buffer(out);
+
+	tora_encode(b,CODE_FILE,BUF_STR(con->physical.path));
+ 	tora_encode(b,CODE_URI,BUF_STR(con->uri.path));
+	tora_encode(b,CODE_HOST_NAME,BUF_STR(con->server_name));
+	tora_encode(b,CODE_CLIENT_IP, inet_ntop_cache_get_ip(srv, &(con->dst_addr)));
+
+	for (i = 0; i < con->request.headers->used; i++) {
+        data_string *ds;
+        ds = (data_string *)con->request.headers->data[i];
+		if (buffer_is_empty(ds->value) || buffer_is_empty(ds->key)) continue;
+		tora_encode(b,CODE_HEADER_KEY,BUF_STR(ds->key));
+		tora_encode(b,CODE_HEADER_VALUE,BUF_STR(ds->value));
+	}
+
+	out->bytes_in += b->used - 1;
+
+	return HANDLER_FINISHED;
+}
+
+INIT_FUNC(mod_proxy_backend_tora_init) {
+	mod_proxy_core_plugin_data *core_data;
+	protocol_plugin_data *p;
+
+	/* get the plugin_data of the core-plugin */
+	core_data = plugin_get_config(srv, CORE_PLUGIN);
+	if(!core_data) return NULL;
+
+	p = calloc(1, sizeof(*p));
+
+	/* define protocol handler callbacks */
+	p->protocol = (core_data->proxy_register_protocol)("tora");
+
+	p->protocol->proxy_stream_init = proxy_tora_init;
+	p->protocol->proxy_stream_cleanup = proxy_tora_cleanup;
+	p->protocol->proxy_stream_decoder = proxy_tora_stream_decoder;
+	p->protocol->proxy_stream_encoder = proxy_tora_stream_encoder;
+	p->protocol->proxy_encode_request_headers = proxy_tora_encode_request_headers;
+
+	return p;
+}
+
+FREE_FUNC(mod_proxy_backend_tora_free) {
+	protocol_plugin_data *p = p_d;
+
+	UNUSED(srv);
+
+	if (!p) return HANDLER_GO_ON;
+
+	free(p);
+
+	return HANDLER_GO_ON;
+}
+
+LI_EXPORT int mod_proxy_backend_tora_plugin_init(plugin *p) {
+	data_string *ds;
+
+	p->version      = LIGHTTPD_VERSION_ID;
+	p->name         = buffer_init_string("mod_proxy_backend_tora");
+
+	p->init         = mod_proxy_backend_tora_init;
+	p->cleanup      = mod_proxy_backend_tora_free;
+
+	p->data         = NULL;
+
+	ds = data_string_init();
+	buffer_copy_string_len(ds->value, CONST_STR_LEN(CORE_PLUGIN));
+	array_insert_unique(p->required_plugins, (data_unset *)ds);
+
+	return 0;
+}
+
+
-- 
Neko : One VM to run them all
(http://nekovm.org)

Reply via email to