-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
Nicolas Cannasse a écrit :
> KaalH! a écrit :
>> -----BEGIN PGP SIGNED MESSAGE-----
>> Hash: SHA1
>>
>> Nicolas Cannasse a écrit :
>>> KaalH! a écrit :
>>>> -----BEGIN PGP SIGNED MESSAGE-----
>>>> Hash: SHA1
>>>>
>>>> Sugoi nicolas!, tora is just was I expected to deploy scalable neko
>>>> projects.
>>>>
>>>> The following patch fix a compilation issue on ubuntu.
>>> Patched, thanks for your contribution ;)
>>
>> :)
>>
>>> Best,
>>> Nicolas
>>>
>>
>> Nicolas, is the tora client protocol api stable ?
>>
>> I've started a lighttpd >= 1.5 proxy backend implementation for tora,
>> and since protocol.c has dependencies with apache libs, I have to copy &
>> paste entire functions from it to mod_proxy_backend_tora.c.
>>
>> A generic tora client lib used by lighttpd/nginx implementations would
>> be a better way to do it, but I lack knowledge & time to do it alone.
>>
>> Hey neko community !, what about setting a google/sf/whatever project
>> for it and release a tora client lib, lighttpd & nginx implementations
>> before nicolas write it in 12'23" if/when he'll need it for MT ? :)
>
> We actually need Lighttpd support, but I don't have time for it right
> now :) Thank you for contributing it, I will integerate it in main
> distribution when done.
>
> The API is quite stable but I would prefer protocol.c + .h to remain the
> same on all servers. Would it not be possible to use #ifdef instead of
> copy/paste ?
>
> Another possibility would be to have an abstract protocol API that is
> implemented differently in mod_tora.c (or lighty_tora.c).
>
> Nicolas
>
>
This is a tora proxy backend implementation for
lighttpd 1.5.0 >= r1992 :)
lighttpd proxy layer allow load balancing & failover for free, that's
why I've use it instead of a standalone module.
I don't used protocol.c at all, most functions depends of mcontext,
and the lighttpd mod_proxy architecture has many differences with the
apache module.
There's still a network timeout issue with large multipart data ( > 100
MB on my testing environment).
All the data is mmaped from temp files, parsed to find boundaries,
pushed in output "chunkqueue" in memory, and then sent to the tora server.
This process is quite inefficient, but I lack time to optimize it now.
Installation :
wget http://www.lighttpd.net/download/lighttpd-1.5.0-r1992.tar.gz -O - |
tar zxf -
cd lighttpd-1.5.0
patch -p1 < ../lighty_tora.diff
autoreconf
./configure ...
make
sudo make install
Configuration sample :
server.modules += ( "mod_proxy_core", "mod_proxy_backend_tora" )
$HTTP["host"] =~ ".*" {
server.document-root = "/var/www"
$PHYSICAL["existing-path"] =~ "\.n$" {
proxy-core.protocol = "tora"
proxy-core.backends = ( "127.0.0.1:6666" )
}
}
This module was not extensively tested, it's just start to work a few
hours ago, so please give it a try and report issues.
Kaalh
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.6 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org
iD8DBQFI5ipfWRw620CDc80RAhYFAKCSOsF+LpxbvqLxWVsZ4htYGev3bQCfc9qW
b9Fjb+SjOcr50DbflSWenPg=
=rC2Z
-----END PGP SIGNATURE-----
diff -ru lighttpd-1.5.0/configure.in lighttpd/configure.in
--- lighttpd-1.5.0/configure.in 2007-09-02 21:46:03.000000000 +0200
+++ lighttpd/configure.in 2008-09-29 20:35:05.000000000 +0200
@@ -632,6 +632,7 @@
mod_proxy_backend_fastcgi \
mod_proxy_backend_scgi \
mod_proxy_backend_ajp13 \
+ mod_proxy_backend_tora \
mod_rrdtool \
mod_secdownload \
mod_setenv \
diff -ru lighttpd-1.5.0/src/Makefile.am lighttpd/src/Makefile.am
--- lighttpd-1.5.0/src/Makefile.am 2007-05-08 10:31:21.000000000 +0200
+++ lighttpd/src/Makefile.am 2008-09-29 20:32:10.000000000 +0200
@@ -243,6 +243,10 @@
mod_proxy_backend_ajp13_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
mod_proxy_backend_ajp13_la_LIBADD = $(common_libadd) $(PCRE_LIB)
+lib_LTLIBRARIES += mod_proxy_backend_tora.la
+mod_proxy_backend_tora_la_SOURCES = mod_proxy_backend_tora.c
+mod_proxy_backend_tora_la_LDFLAGS = -module -export-dynamic -avoid-version -no-undefined
+mod_proxy_backend_tora_la_LIBADD = $(common_libadd) $(PCRE_LIB)
lib_LTLIBRARIES += mod_ssi.la
mod_ssi_la_SOURCES = mod_ssi_exprparser.c mod_ssi_expr.c mod_ssi.c diff -Pru lighttpd-1.5.0/src/mod_proxy_backend_tora.c lighttpd/src/mod_proxy_backend_tora.c
--- lighttpd-1.5.0/src/mod_proxy_backend_tora.c 1970-01-01 01:00:00.000000000 +0100
+++ lighttpd/src/mod_proxy_backend_tora.c 2008-10-03 14:59:24.000000000 +0200
@@ -0,0 +1,826 @@
+/*
+ * Copyright (c) 2008, KaalH! <[EMAIL PROTECTED]>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of the <organization> nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <copyright holder> ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL <copyright holder> BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include "sys-files.h"
+
+#ifdef HAVE_SYS_FILIO_H
+# include <sys/filio.h>
+#endif
+
+#include "mod_proxy_core.h"
+#include "mod_proxy_core_protocol.h"
+#include "configfile.h"
+#include "buffer.h"
+#include "log.h"
+#include "sys-strings.h"
+#include "inet_ntop_cache.h"
+
+#define CORE_PLUGIN "mod_proxy_core"
+
+#define TORA_HEADER_LEN 4
+
+#define BUFSIZE (1 << 16)
+
+#define PARSE_HEADER(start,cursor) \
+ cursor = start; \
+ if( *cursor == '"' ) { \
+ start++; \
+ cursor++; \
+ while( *cursor != '"' && *cursor != 0 ) \
+ cursor++; \
+ } else { \
+ while( *cursor != 0 && *cursor != '\r' && *cursor != '\n' && *cursor != '\t' ) \
+ cursor++; \
+ }
+
+typedef struct {
+ PLUGIN_DATA;
+
+ proxy_protocol *protocol;
+} protocol_plugin_data;
+
+typedef struct {
+ size_t len;
+ off_t offset;
+ int type;
+} tora_packet;
+
+typedef struct {
+ size_t len;
+ off_t offset;
+ buffer *buf;
+ buffer *boundary;
+ chunkqueue *tmp;
+ int name_sent;
+} multipart;
+
+/**
+ * The tora protocol decoder will use this struct for storing state variables
+ * used in decoding the stream
+ */
+typedef struct {
+ buffer *buf; /* holds raw header bytes or used to buffer STDERR */
+ off_t offset; /* parse offset into buffer. */
+ tora_packet packet; /* parsed info about current packet. */
+ size_t chunk_len; /* chunk length */
+ size_t requested_bytes; /* backend requested we send this many bytes of the request content. */
+ buffer *hdkey; /* temporary holds header key */
+ multipart multipart; /* holds multipart data */
+} tora_state_data;
+
+typedef enum {
+ CODE_FILE = 1,
+ CODE_URI,
+ CODE_CLIENT_IP,
+ CODE_GET_PARAMS,
+ CODE_POST_DATA,
+ CODE_HEADER_KEY,
+ CODE_HEADER_VALUE,
+ CODE_HEADER_ADD_VALUE,
+ CODE_PARAM_KEY,
+ CODE_PARAM_VALUE,
+ CODE_HOST_NAME,
+ CODE_HTTP_METHOD,
+ CODE_EXECUTE,
+ CODE_ERROR,
+ CODE_PRINT,
+ CODE_LOG,
+ CODE_FLUSH,
+ CODE_REDIRECT,
+ CODE_RETURNCODE,
+ CODE_QUERY_MULTIPART,
+ CODE_PART_FILENAME,
+ CODE_PART_KEY,
+ CODE_PART_DATA,
+ CODE_PART_DONE,
+ CODE_TEST_CONNECT,
+} proto_code;
+
+typedef struct {
+ unsigned char type;
+ unsigned char lengthB0;
+ unsigned char lengthB1;
+ unsigned char lengthB2;
+} TORA_Header;
+
+
+void tora_encode_header(buffer *b, proto_code code,int len ) {
+ unsigned char h[4];
+ h[0] = (unsigned char)code;
+ h[1] = (unsigned char)len;
+ h[2] = (unsigned char)(len >> 8);
+ h[3] = (unsigned char)(len >> 16);
+ buffer_append_string_len(b,(char*)h,4);
+}
+void tora_encode_size(buffer *b, proto_code code, const char *str, int len ) {
+ tora_encode_header(b,code,len);
+ buffer_append_string_len(b,str,len);
+}
+void tora_encode(buffer *b, proto_code code, const char *str ) {
+ tora_encode_size(b,code,str,(int)strlen(str));
+}
+
+static char *memfind( char *mem, int mlen, const char *v ) {
+ char *found;
+ int len = (int)strlen(v);
+ if( len == 0 )
+ return mem;
+ while( (found = memchr(mem,*v,mlen)) != NULL ) {
+ if( (int)(found - mem) + len > mlen )
+ break;
+ if( memcmp(found,v,len) == 0 )
+ return found;
+ mlen -= (int)(found - mem + 1);
+ mem = found + 1;
+ }
+ return NULL;
+}
+
+int url_decode( const char *bin, int len, char *bout ) {
+ int pin = 0;
+ int pout = 0;
+ while( len-- > 0 ) {
+ char c = bin[pin++];
+ if( c == '+' )
+ c = ' ';
+ else if( c == '%' ) {
+ int p1, p2;
+ if( len < 2 )
+ break;
+ p1 = bin[pin++];
+ p2 = bin[pin++];
+ len -= 2;
+ if( p1 >= '0' && p1 <= '9' )
+ p1 -= '0';
+ else if( p1 >= 'a' && p1 <= 'f' )
+ p1 -= 'a' - 10;
+ else if( p1 >= 'A' && p1 <= 'F' )
+ p1 -= 'A' - 10;
+ else
+ continue;
+ if( p2 >= '0' && p2 <= '9' )
+ p2 -= '0';
+ else if( p2 >= 'a' && p2 <= 'f' )
+ p2 -= 'a' - 10;
+ else if( p2 >= 'A' && p2 <= 'F' )
+ p2 -= 'A' - 10;
+ else
+ continue;
+ c = (char)((unsigned char)((p1 << 4) + p2));
+ }
+ bout[pout++] = c;
+ }
+ bout[pout] = 0;
+ return pout;
+}
+
+#define DEFAULT_SIZE 256
+
+static void tora_param_encode( buffer *b, proto_code code, const char *str, int len ) {
+ char tmp[DEFAULT_SIZE];
+ char *buf = NULL;
+ int size;
+ if( len >= DEFAULT_SIZE )
+ buf = malloc(len+1);
+ size = url_decode(str,len,buf?buf:tmp);
+ tora_encode_size(b,code,buf?buf:tmp,size);
+ if( buf )
+ free(buf);
+}
+
+void tora_parse_params(buffer *b, buffer *r) {
+ char *aand, *aeq, *asep;
+ char *args = BUF_STR(b);
+
+ while( TRUE ) {
+ aand = strchr(args,'&');
+ if( aand == NULL ) {
+ asep = strchr(args,';');
+ aand = asep;
+ } else {
+ asep = strchr(args,';');
+ if( asep != NULL && asep < aand )
+ aand = asep;
+ }
+ if( aand != NULL )
+ *aand = 0;
+ aeq = strchr(args,'=');
+ if( aeq != NULL ) {
+ *aeq = 0;
+ tora_param_encode(r,CODE_PARAM_KEY,args,(int)(aeq-args));
+ tora_param_encode(r,CODE_PARAM_VALUE,aeq+1,(int)strlen(aeq+1));
+ *aeq = '=';
+ }
+ if( aand == NULL )
+ break;
+ *aand = (aand == asep)?';':'&';
+ args = aand+1;
+ }
+}
+
+tora_state_data *tora_state_data_init(void) {
+ tora_state_data *data;
+
+ data = calloc(1, sizeof(*data));
+ data->buf = buffer_init();
+ data->hdkey = buffer_init();
+ data->multipart.buf = buffer_init();
+ data->multipart.boundary = buffer_init();
+ data->multipart.tmp = chunkqueue_init();
+ return data;
+}
+
+PROXY_CONNECTION_FUNC(proxy_tora_init) {
+
+ UNUSED(srv);
+
+ if(!proxy_con->protocol_data) {
+ proxy_con->protocol_data = tora_state_data_init();
+ }
+ return 1;
+}
+
+void tora_state_data_free(tora_state_data *data) {
+ buffer_free(data->buf);
+ buffer_free(data->hdkey);
+ buffer_free(data->multipart.boundary);
+ buffer_free(data->multipart.buf);
+ chunkqueue_free(data->multipart.tmp);
+ free(data);
+}
+
+void tora_state_data_reset(tora_state_data *data) {
+ buffer_reset(data->buf);
+ buffer_reset(data->multipart.buf);
+ buffer_reset(data->multipart.boundary);
+ chunkqueue_reset(data->multipart.tmp);
+ data->packet.len = 0;
+ data->packet.offset = 0;
+ data->packet.type = 0;
+ data->offset = 0;
+ data->chunk_len = 0;
+ data->multipart.offset = 0;
+ data->multipart.name_sent = 0;
+}
+
+PROXY_CONNECTION_FUNC(proxy_tora_cleanup) {
+
+ UNUSED(srv);
+
+ if(proxy_con->protocol_data) {
+ tora_state_data_free((tora_state_data *)proxy_con->protocol_data);
+ proxy_con->protocol_data = NULL;
+ }
+ return 1;
+}
+
+/*
+ * copy len bytes from chunk-chain into buffer
+ */
+static int proxy_tora_fill_buffer(tora_state_data *data, chunkqueue *in, size_t len) {
+ off_t we_have = 0, we_need = len;
+ chunk *c;
+
+ buffer_prepare_append(data->buf, we_need);
+ for (c = in->first; c && we_need > 0; c = c->next) {
+ if(c->mem->used == 0) continue;
+ we_have = c->mem->used - c->offset - 1;
+ if (we_have == 0) continue;
+ if (we_have > we_need) we_have = we_need;
+ buffer_append_string_len(data->buf, c->mem->ptr + c->offset, we_have);
+ data->packet.offset += we_have;
+ c->offset += we_have;
+ in->bytes_out += we_have;
+ we_need -= we_have;
+ }
+ return we_need;
+}
+
+
+/*
+ * copy len bytes from chunk-chain into buffer
+ */
+static int proxy_tora_fill_mbuffer(tora_state_data *data, chunkqueue *in, size_t len) {
+ off_t we_have = 0, we_need = len;
+ chunk *c;
+
+ for (c = in->first; c && we_need > 0; c = c->next) {
+ if(c->type == FILE_CHUNK) {
+ we_have = c->file.length - c->offset;
+ if (we_have > we_need) we_have = we_need;
+ if (c->file.mmap.start == MAP_FAILED) {
+ if (-1 == c->file.fd && /* open the file if not already open */
+ -1 == (c->file.fd = open(c->file.name->ptr, O_RDONLY | O_BINARY))) {
+ ERROR("open(%s) failed: %s", BUF_STR(c->file.name), strerror(errno));
+ return -2;
+ }
+ if (MAP_FAILED == (c->file.mmap.start = mmap(0, c->file.length, PROT_READ, MAP_SHARED, c->file.fd, 0))) {
+ ERROR("mmap(%s) failed: %s", BUF_STR(c->file.name), strerror(errno));
+ return -2;
+ }
+ close(c->file.fd);
+ c->file.fd = -1;
+ c->file.mmap.length = c->file.length;
+ }
+ buffer_append_string_len(data->multipart.buf, c->file.mmap.start + c->offset, we_have);
+ } else {
+ if(c->mem->used == 0) continue;
+ we_have = c->mem->used - c->offset - 1;
+ if (we_have == 0) continue;
+ if (we_have > we_need) we_have = we_need;
+ buffer_append_string_len(data->multipart.buf, c->mem->ptr + c->offset, we_have);
+ }
+ c->offset += we_have;
+ in->bytes_out += we_have;
+ we_need -= we_have;
+ }
+ return we_need;
+}
+
+
+static int proxy_tora_stream_encoder_multipart (proxy_session *sess, unsigned int bufsize) {
+ proxy_connection *proxy_con = sess->proxy_con;
+ chunkqueue *in = sess->remote_con->recv;
+ chunkqueue *out = proxy_con->send;
+ tora_state_data *data = (tora_state_data *)proxy_con->protocol_data;
+ size_t we_need = 0;
+ buffer *buf;
+ char *name, *end_name, *filename = NULL, *end_file_name = NULL, *body = NULL;
+ int len = 0;
+ char *boundstr = NULL;
+ unsigned int boundstr_len;
+ const char *boundary, *bend;
+
+ if(buffer_is_empty(data->multipart.boundary) ) {
+
+ data_string *ds;
+
+ ds = (data_string *) array_get_element(sess->remote_con->request.headers, CONST_STR_LEN("Content-Type"));
+ if( (boundary = strstr(BUF_STR(ds->value),"boundary=")) == NULL ) {
+ return HANDLER_FINISHED;
+ }
+ boundary += 9;
+ PARSE_HEADER(boundary,bend);
+ len = (int)(bend - boundary);
+ boundstr_len = len + 2;
+ if( boundstr_len > bufsize / 2 )
+ return HANDLER_GO_ON;
+ boundstr = (char*)malloc(boundstr_len + 1);
+ boundstr[0] = '-';
+ boundstr[1] = '-';
+ boundstr[boundstr_len] = 0;
+ memcpy(boundstr+2,boundary,len);
+ buffer_copy_string(data->multipart.boundary,boundstr);
+ free(boundstr);
+ TRACE("boundary %s",data->multipart.boundary->ptr);
+ }
+
+ if(data->multipart.buf->used < bufsize) {
+ we_need = proxy_tora_fill_mbuffer(data, in, bufsize - data->multipart.buf->used);
+ }
+
+ if(data->multipart.name_sent == 0) {
+ /* send name & filename */
+ buf = chunkqueue_get_append_buffer(out);
+ len = data->multipart.buf->used;
+ name = memfind(data->multipart.buf->ptr + data->multipart.offset ,len,"Content-Disposition:");
+ if( name == NULL ) {
+ ERROR("%s","no content disposition in multipart data");
+ chunkqueue_remove_finished_chunks(in);
+ buffer_reset(data->multipart.buf);
+ return HANDLER_FINISHED;
+ }
+ name = memfind(name, len - (int)(name - data->multipart.buf->ptr),"name=");
+ if( name == NULL ) {
+ ERROR("%s","no content name in multipart data");
+ chunkqueue_remove_finished_chunks(in);
+ buffer_reset(data->multipart.buf);
+ return HANDLER_GO_ON;
+ }
+ name += 5;
+ PARSE_HEADER(name,end_name);
+ body = memfind(end_name, len - (int)(end_name - data->multipart.buf->ptr),"\r\n\r\n");
+ if( body == NULL ) {
+ ERROR("%s","corrupted multipart data");
+ chunkqueue_remove_finished_chunks(in);
+ buffer_reset(data->multipart.buf);
+ return HANDLER_GO_ON;
+ }
+ filename = memfind(name,(int)(body - name),"filename=");
+ if( filename != NULL ) {
+ filename += 9;
+ PARSE_HEADER(filename,end_file_name);
+ }
+ body += 4;
+ data->multipart.offset = (int)(body - data->multipart.buf->ptr);
+ if( filename )
+ tora_encode_size(buf,CODE_PART_FILENAME, filename,(int)(end_file_name - filename));
+ tora_encode_size(buf,CODE_PART_KEY, name,(int)(end_name - name));
+ out->bytes_in += buf->used - 1;
+ data->multipart.name_sent = 1;
+
+ buffer *b = buffer_init();
+ buffer_copy_string_len(b,data->multipart.buf->ptr + data->multipart.offset,data->multipart.buf->used - data->multipart.offset - 1);
+ buffer_reset(data->multipart.buf);
+ buffer_copy_string_buffer(data->multipart.buf,b);
+ buffer_free(b);
+
+ data->multipart.offset = 0;
+ return HANDLER_GO_ON;
+ }
+
+ buf = buffer_init();
+
+ boundary = memfind(data->multipart.buf->ptr + data->multipart.offset ,data->multipart.buf->used - data->multipart.offset,BUF_STR(data->multipart.boundary));
+ if(boundary == NULL) {
+ tora_encode_size(buf,CODE_PART_DATA, data->multipart.buf->ptr,data->multipart.buf->used-1);
+ out->bytes_in += buf->used -1;
+ chunkqueue_append_buffer(out,buf);
+ buffer_free(buf);
+
+ buffer_reset(data->multipart.buf);
+ data->multipart.offset = 0;
+ return HANDLER_GO_ON;
+ } else {
+ tora_encode_size(buf,CODE_PART_DATA, data->multipart.buf->ptr + data->multipart.offset,(int)(boundary - data->multipart.buf->ptr) - data->multipart.offset - 2);
+ tora_encode_size(buf,CODE_PART_DONE,NULL,0);
+ out->bytes_in += buf->used -1;
+ chunkqueue_append_buffer(out,buf);
+ buffer_free(buf);
+
+ data->multipart.offset = (int)(boundary - data->multipart.buf->ptr) + strlen(BUF_STR(data->multipart.boundary));
+ data->multipart.name_sent = 0;
+
+ if(strcmp(data->multipart.buf->ptr + data->multipart.offset ,"--\r\n") == 0 ) {
+ buffer_reset(data->multipart.buf);
+ buffer_reset(data->multipart.boundary);
+ return HANDLER_FINISHED;
+ }
+ }
+
+ return HANDLER_GO_ON;
+}
+
+PROXY_STREAM_DECODER_FUNC(proxy_tora_stream_decoder_internal) {
+ proxy_connection *proxy_con = sess->proxy_con;
+ chunkqueue *in = proxy_con->recv;
+ tora_state_data *data = (tora_state_data *)proxy_con->protocol_data;
+ size_t we_have = 0, we_need = 0;
+ TORA_Header *header;
+ handler_t rc = HANDLER_GO_ON;
+ char buf[32];
+ int res;
+
+ UNUSED(srv);
+
+ if ((in->bytes_in == in->bytes_out) && in->is_closed) {
+ /* everything got passed through,
+ *
+ * as we usually have a FIN packet we should expect to get a is_closed within the
+ * tora stream. Looks like the remote side crashed
+ * */
+ out->is_closed = 1;
+
+ TRACE("%ld / %ld -> %d",
+ in->bytes_in, in->bytes_out,
+ in->is_closed);
+
+ ERROR("looks like the tora backend (%s) terminated before it sent a FIN packet", BUF_STR(sess->request_uri));
+
+ return HANDLER_FINISHED;
+ }
+
+ /* no data ? */
+ if (!in->first) return HANDLER_GO_ON;
+
+ if(data->packet.type == 0) {
+ we_need = (TORA_HEADER_LEN - data->packet.offset);
+ we_need = proxy_tora_fill_buffer(data, in, we_need);
+ if(we_need > 0) {
+ /* we need more data to parse the header. */
+ chunkqueue_remove_finished_chunks(in);
+ return HANDLER_GO_ON;
+ }
+ header = (TORA_Header *)(data->buf->ptr);
+ data->packet.type = header->type;
+ data->packet.len = ((header->lengthB0 | header->lengthB1 << 8) | header->lengthB2 << 16);
+ data->packet.offset = 0;
+
+ /* Finished parsing raw header bytes. */
+ buffer_reset(data->buf);
+ }
+
+ /* proccess the packet's contents. */
+ we_need = data->packet.len - data->packet.offset;
+
+ /* for most packet types copy the content into the data buffer */
+ if (data->packet.type != CODE_PRINT) {
+ if(we_need > 0) {
+ /* copy tora packet contents to buffer */
+ we_need = proxy_tora_fill_buffer(data, in, we_need);
+ /* make sure we have the full tora packet content. */
+ if(we_need > 0) {
+ chunkqueue_remove_finished_chunks(in);
+ return HANDLER_GO_ON;
+ }
+ }
+ }
+
+ switch(data->packet.type) {
+ case CODE_EXECUTE:
+ data->packet.offset = 0;
+ in->is_closed = 1;
+ out->is_closed = 1;
+ sess->is_request_finished = 1;
+ proxy_con->send->is_closed = 1;
+ rc = HANDLER_FINISHED;
+ break;
+ case CODE_HEADER_KEY:
+ buffer_copy_string_buffer(data->hdkey,data->buf);
+ break;
+ case CODE_HEADER_VALUE:
+ array_append_key_value(sess->resp->headers, CONST_BUF_LEN(data->hdkey), CONST_BUF_LEN(data->buf));
+ break;
+ case CODE_HEADER_ADD_VALUE:
+ array_append_key_value(sess->resp->headers, CONST_BUF_LEN(data->hdkey), CONST_BUF_LEN(data->buf));
+ break;
+ case CODE_PRINT:
+ LI_ltostr(buf, we_need);
+ array_append_key_value(sess->resp->headers, CONST_STR_LEN("Content-Length"), buf, strlen(buf));
+ if(sess->have_response_headers < 1) {
+ if(sess->resp->status == -1) {
+ sess->resp->status = 200;
+ }
+ if(array_get_element(sess->resp->headers, CONST_STR_LEN("Content-Type")) == NULL) {
+ array_append_key_value(sess->resp->headers, CONST_STR_LEN("Content-Type"),CONST_STR_LEN("text/html"));
+ }
+ sess->have_response_headers = 1;
+ }
+ we_have = chunkqueue_steal_chunks_len(out, in->first, we_need);
+ we_need -= we_have;
+ in->bytes_out += we_have;
+ out->bytes_in += we_have;
+ break;
+ case CODE_LOG:
+ TRACE("[mod_tora] %s", BUF_STR(data->buf));
+ break;
+ case CODE_ERROR:
+ TRACE("[mod_tora] Error: %s", BUF_STR(data->buf));
+ rc = HANDLER_ERROR;
+ break;
+ case CODE_FLUSH:
+ /* TODO: how does lighty flush output ??? */
+ TRACE("[mod_tora] NOTICE: %s","flush not implemented");
+ break;
+ case CODE_REDIRECT:
+ array_append_key_value(sess->resp->headers, CONST_STR_LEN("Location"), CONST_BUF_LEN(data->buf));
+ sess->resp->status = 301;
+ break;
+ case CODE_RETURNCODE:
+ sess->resp->status = atoi(BUF_STR(data->buf));
+ break;
+ case CODE_QUERY_MULTIPART:
+ chunkqueue_reset(proxy_con->send);
+ do {
+ /* decode the packet */
+ res = proxy_tora_stream_encoder_multipart(sess,atoi(BUF_STR(data->buf)));
+ } while (proxy_con->recv->first && res == HANDLER_GO_ON);
+ rc = res;
+ break;
+ default:
+ TRACE("[mod_tora] %s","Unexpected code");
+ rc = HANDLER_ERROR;
+ break;
+ }
+
+ if(we_need == 0) {
+ /* packet finished, reset state for next packet */
+ tora_state_data_reset(data);
+ }
+
+ chunkqueue_remove_finished_chunks(in);
+
+ return rc;
+}
+
+PROXY_STREAM_DECODER_FUNC(proxy_tora_stream_decoder) {
+ proxy_connection *proxy_con = sess->proxy_con;
+ chunkqueue *in = proxy_con->recv;
+ int res;
+
+ if(out->is_closed) return 1;
+ /* decode the whole packet stream */
+ do {
+ /* decode the packet */
+ res = proxy_tora_stream_decoder_internal(srv, sess, out);
+ } while (in->first && res == HANDLER_GO_ON);
+
+ return res;
+}
+
+
+/**
+ * transform the content-stream into a valid TORA content-stream
+ *
+ * as we don't apply chunked-encoding here, pass it on AS IS
+ */
+PROXY_STREAM_ENCODER_FUNC(proxy_tora_stream_encoder) {
+ proxy_connection *proxy_con = sess->proxy_con;
+ connection *con = sess->remote_con;
+ chunkqueue *out = proxy_con->send;
+ handler_t rc = HANDLER_GO_ON;
+ size_t we_have = 0;
+ buffer *b, *bhm = buffer_init();
+ chunk *c;
+
+ UNUSED(srv);
+
+ /* output queue closed, can't encode any more data. */
+ if(out->is_closed) return HANDLER_FINISHED;
+
+ b = chunkqueue_get_append_buffer(out);
+
+ buffer_copy_string(bhm,get_http_method_name(con->request.http_method));
+
+ if (buffer_is_empty(con->uri.query)) {
+ char *qstr = malloc(con->request.uri->used + 1);
+ if (NULL != (qstr = strchr(con->request.uri->ptr, '?'))) {
+ /** extract query string from request.uri */
+ buffer_copy_string(con->uri.query, qstr + 1);
+ }
+ free(qstr);
+ }
+ if (!buffer_is_empty(con->uri.query)) {
+ tora_encode(b,CODE_GET_PARAMS, BUF_STR(con->uri.query));
+ tora_parse_params(con->uri.query,b);
+ }
+ if (strcmp(bhm->ptr,"POST") == 0) {
+
+ unsigned int is_multipart = 0;
+ data_string *ds;
+
+ ds = (data_string *) array_get_element(con->request.headers, CONST_STR_LEN("Content-Type"));
+ if(strstr(BUF_STR(ds->value),"multipart/form-data"))
+ is_multipart = 1;
+
+ if(is_multipart == 0) {
+ if(chunkqueue_length(in)> BUFSIZE) {
+ ERROR("[mod_tora] %s","Maximum POST data exceeded. Try using multipart encoding");
+ } else {
+
+ chunkqueue *tmp = chunkqueue_init();
+ buffer *r = buffer_init();
+
+ we_have = chunkqueue_steal_all_chunks(tmp, in);
+ in->bytes_out += we_have;
+ tmp->bytes_in += we_have;
+
+ for (c = tmp->first; c; c = c->next) {
+ buffer_append_string(r,c->mem->ptr + c->offset);
+ }
+
+ chunkqueue_free(tmp);
+
+ tora_encode(b,CODE_HTTP_METHOD, bhm->ptr);
+ tora_encode(b,CODE_POST_DATA, BUF_STR(r));
+ tora_parse_params(r,b);
+
+ buffer_free(r);
+ }
+ } else {
+ tora_encode_size(b,CODE_EXECUTE, NULL,0);
+ rc = HANDLER_FINISHED;
+ }
+
+ } else {
+ tora_encode(b,CODE_HTTP_METHOD, bhm->ptr);
+ }
+
+ buffer_free(bhm);
+
+ if (in->bytes_in == in->bytes_out && in->is_closed) {
+ out->is_closed = 1;
+ tora_encode_size(b,CODE_EXECUTE, NULL,0);
+ return HANDLER_FINISHED;
+ }
+
+ return rc;
+}
+
+/**
+ * generate a HTTP/1.1 proxy request from the set of request-headers
+ *
+ */
+PROXY_STREAM_ENCODER_FUNC(proxy_tora_encode_request_headers) {
+ proxy_connection *proxy_con = sess->proxy_con;
+ chunkqueue *out = proxy_con->send;
+ connection *con = sess->remote_con;
+ buffer *b;
+ unsigned int i;
+
+ //UNUSED(srv);
+ UNUSED(in);
+
+ b = chunkqueue_get_append_buffer(out);
+
+ tora_encode(b,CODE_FILE,BUF_STR(con->physical.path));
+ tora_encode(b,CODE_URI,BUF_STR(con->uri.path));
+ tora_encode(b,CODE_HOST_NAME,BUF_STR(con->server_name));
+ tora_encode(b,CODE_CLIENT_IP, inet_ntop_cache_get_ip(srv, &(con->dst_addr)));
+
+ for (i = 0; i < con->request.headers->used; i++) {
+ data_string *ds;
+ ds = (data_string *)con->request.headers->data[i];
+ if (buffer_is_empty(ds->value) || buffer_is_empty(ds->key)) continue;
+ tora_encode(b,CODE_HEADER_KEY,BUF_STR(ds->key));
+ tora_encode(b,CODE_HEADER_VALUE,BUF_STR(ds->value));
+ }
+
+ out->bytes_in += b->used - 1;
+
+ return HANDLER_FINISHED;
+}
+
+INIT_FUNC(mod_proxy_backend_tora_init) {
+ mod_proxy_core_plugin_data *core_data;
+ protocol_plugin_data *p;
+
+ /* get the plugin_data of the core-plugin */
+ core_data = plugin_get_config(srv, CORE_PLUGIN);
+ if(!core_data) return NULL;
+
+ p = calloc(1, sizeof(*p));
+
+ /* define protocol handler callbacks */
+ p->protocol = (core_data->proxy_register_protocol)("tora");
+
+ p->protocol->proxy_stream_init = proxy_tora_init;
+ p->protocol->proxy_stream_cleanup = proxy_tora_cleanup;
+ p->protocol->proxy_stream_decoder = proxy_tora_stream_decoder;
+ p->protocol->proxy_stream_encoder = proxy_tora_stream_encoder;
+ p->protocol->proxy_encode_request_headers = proxy_tora_encode_request_headers;
+
+ return p;
+}
+
+FREE_FUNC(mod_proxy_backend_tora_free) {
+ protocol_plugin_data *p = p_d;
+
+ UNUSED(srv);
+
+ if (!p) return HANDLER_GO_ON;
+
+ free(p);
+
+ return HANDLER_GO_ON;
+}
+
+LI_EXPORT int mod_proxy_backend_tora_plugin_init(plugin *p) {
+ data_string *ds;
+
+ p->version = LIGHTTPD_VERSION_ID;
+ p->name = buffer_init_string("mod_proxy_backend_tora");
+
+ p->init = mod_proxy_backend_tora_init;
+ p->cleanup = mod_proxy_backend_tora_free;
+
+ p->data = NULL;
+
+ ds = data_string_init();
+ buffer_copy_string_len(ds->value, CONST_STR_LEN(CORE_PLUGIN));
+ array_insert_unique(p->required_plugins, (data_unset *)ds);
+
+ return 0;
+}
+
+
--
Neko : One VM to run them all
(http://nekovm.org)