Update of /cvsroot/monetdb/sql/src/backends/monet5
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv9483
Modified Files:
merovingian.mx
Log Message:
After an interesting talk with Arjen, I decided that a big missing
feature of Merovingian is to hide how the server(s) are accessed by just
having one port (think firewall) that serves all.
Hence, here now, at this very moment (after a full day of chasing stream
problems) we have a full reincarnation of the MDatabasePool: a proxying
wrapper instance.
I simply extended the (Armada influenced) redirect list with a proxy
redirect "offer". Sent as first option currently, to make the naive
apprroach in JDBC (take the first and forget about the rest) benefit
from this proxy. It needs configuration and finetuning though.
Protocol wise, there is just a new redirect URI scheme "merovingian"
with as scheme-specific-part the text "proxy". A client should
interpret is as "reuse the existing connection and just start logging in
again".
This proxying opens up loads of opportunities, and a can of worms for
mclient users; most probably the utility will bark that it doesn't
understand the first redirect.
U merovingian.mx
Index: merovingian.mx
===================================================================
RCS file: /cvsroot/monetdb/sql/src/backends/monet5/merovingian.mx,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -d -r1.31 -r1.32
--- merovingian.mx 13 Mar 2008 12:17:20 -0000 1.31
+++ merovingian.mx 27 Mar 2008 16:12:37 -0000 1.32
@@ -586,24 +586,177 @@
return(newErr(strerror(errno)));
}
+typedef struct _merovingian_proxy {
+ stream *in; /* the input to read from and to dispatch to out */
+ stream *out; /* where to write the read input to */
+ stream *co_in; /* the input stream of the co-thread,
+ don't read from this stream! close only */
+ stream *co_out; /* the output stream of the co-thread,
+ don't write to this stream! close only */
+ char *name; /* a description to log when this thread ends */
+} merovingian_proxy;
+
+static void
+proxyThread(void *d)
+{
+ merovingian_proxy *p = (merovingian_proxy *)d;
+ int len;
+ char data[8 * 1024];
+
+ /* pass everything from in to out, until either reading from in, or
+ * writing to out fails, then close in and its related out-stream
+ * (not out!) to make sure the co-thread dies as well */
+ while ((len = stream_read(p->in, data, 1, sizeof(data))) >= 0) {
+ if (len > 0 && stream_write(p->out, data, len, 1) != 1)
+ break;
+ if (len == 0 && stream_flush(p->out) == -1)
+ break;
+ }
+
+ stream_close(p->co_out); /* out towards target B */
+ stream_close(p->in); /* related in from target B */
+
+ stream_close(p->out); /* out towards target A */
+ stream_close(p->co_in); /* related in from target A */
+
+ if (p->name != NULL) {
+ /* name is only set on the client-to-server thread */
+ if (len <= 0) {
+ merlog("client has %s disconnected from proxy",
p->name);
+ } else {
+ merlog("server has terminated proxy connection,
disconnecting client %s", p->name);
+ }
+ GDKfree(p->name);
+ }
+ GDKfree(p);
+}
+
+static err
+startProxy(stream *cfdin, stream *cfout, char *url, char *client)
+{
+ struct hostent *hp;
+ struct sockaddr_in server;
+ struct sockaddr *serv;
+ socklen_t servsize;
+ int ssock;
+ char *port, *t;
+ char *conn;
+ stream *sfdin, *sfout;
+ pthread_t ctos = 0, stoc = 0; /* actually not used */
+ merovingian_proxy *pctos, *pstoc;
+
+ /* quick 'n' dirty parsing */
+ if (strncmp(url, "mapi:monetdb://", sizeof("mapi:monetdb://") - 1) ==
0) {
+ conn = GDKstrdup(url);
+ conn += sizeof("mapi:monetdb://") - 1;
+ /* drop anything off after the hostname */
+ if ((port = strchr(conn, ':')) != NULL) {
+ *port = '\0';
+ port++;
+ if ((t = strchr(port, '/')) != NULL)
+ *t = '\0';
+ } else {
+ return(newErr("can't find a port in redirect, "
+ "this is not going to work:
%s", url));
+ }
+ } else {
+ return(newErr("unsupported protocol/scheme in redirect: %s",
url));
+ }
+
+ hp = gethostbyname(conn);
+ if (hp == NULL)
+ return(newErr("cannot get address for hostname '%s': %s",
+ conn, strerror(errno)));
+
+ memset(&server, 0, sizeof(server));
+ memcpy(&server.sin_addr, hp->h_addr, hp->h_length);
+ server.sin_family = hp->h_addrtype;
+ server.sin_port = htons((unsigned short) (atoi(port) & 0xFFFF));
+ serv = (struct sockaddr *) &server;
+ servsize = sizeof(server);
+
+ ssock = socket(serv->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ if (ssock == INVALID_SOCKET)
+ return(newErr("failed to open socket: %s", strerror(errno)));
+
+ if (connect(ssock, serv, servsize) < 0)
+ return(newErr("failed to connect: %s", strerror(errno)));
+
+ sfdin = block_stream(socket_rastream(ssock, "merovingian<-server (proxy
read)"));
+ sfout = block_stream(socket_wastream(ssock, "merovingian->server (proxy
write)"));
+
+ if (sfdin == 0 || sfout == 0) {
+ stream_close(sfout);
+ stream_close(sfdin);
+ return(newErr("merovingian-server inputstream or outputstream
problems"));
+ }
+
+ /* our proxy schematically looks like this:
+ *
+ * A___>___B
+ * out in | | out in
+ * client --------- | M | --------- server
+ * in out |_____| in out
+ * C < D
+ *
+ * the thread that does A -> B is called ctos, C -> D stoc
+ * the merovingian_proxy structs are filled like:
+ * ctos: in = A, out = B, co_in = D, co_out = C
+ * stoc: in = D, out = C, co_in = A, co_out = B
+ */
+
+ pctos = GDKmalloc(sizeof(merovingian_proxy));
+ pctos->in = cfdin;
+ pctos->out = sfout;
+ pctos->co_in = sfdin;
+ pctos->co_out = cfout;
+ pctos->name = GDKstrdup(client);
+
+ pstoc = GDKmalloc(sizeof(merovingian_proxy));
+ pstoc->in = sfdin;
+ pstoc->out = cfout;
+ pstoc->co_in = cfdin;
+ pstoc->co_out = sfout;
+ pstoc->name = NULL; /* we want only one log-message on disconnect */
+
+ if (pthread_create(&stoc, NULL, (void *(*)(void *))proxyThread, (void
*)pstoc) < 0)
+ {
+ stream_close(cfout);
+ stream_close(cfdin);
+ return(newErr("failed to create proxy thread"));
+ }
+
+ if (pthread_create(&ctos, NULL, (void *(*)(void *))proxyThread, (void
*)pctos) < 0)
+ {
+ stream_close(cfout);
+ stream_close(cfdin);
+ return(newErr("failed to create proxy thread"));
+ }
+
+ return(NO_ERR);
+}
+
static err
handleClient(int sock)
{
stream *fdin, *fout;
- bstream *fin;
str buf = alloca(sizeof(char) * 8096);
char *user = NULL, *algo = NULL, *passwd = NULL, *lang = NULL;
char *database = NULL, *s;
+ char *host = NULL;
sabdb *top = NULL;
sabdb *stat = NULL;
struct sockaddr_in saddr;
socklen_t saddrlen = sizeof(struct sockaddr_in);
+ err e;
- fdin = block_stream(socket_rastream(sock, "read"));
- fout = block_stream(socket_wastream(sock, "write"));
+ fdin = block_stream(socket_rastream(sock, "merovingian<-client
(read)"));
+ fout = block_stream(socket_wastream(sock, "merovingian->client
(write)"));
- if (!fdin || !fout) {
- return(newErr("inputstream or outputstream problems"));
+ if (fdin == 0 || fout == 0) {
+ stream_close(fout);
+ stream_close(fdin);
+ return(newErr("merovingian-client inputstream or outputstream
problems"));
}
/* note that we claim to speak proto 8 here */
@@ -618,10 +771,15 @@
);
stream_flush(fout);
/* get response */
- stream_read_block(fdin, buf, 8095, 1);
-
- fin = bstream_create(fdin, 128 * 8096);
- fin->eof = 1;
+ if (stream_read_block(fdin, buf, 8095, 1) < 0) {
+ /* we didn't get a terminated block :/ */
+ e = newErr("client sent challenge in incomplete block: %s",
buf);
+ stream_printf(fout, "!client sent something this server could
not understand, sorry\n", user);
+ stream_flush(fout);
+ stream_close(fout);
+ stream_close(fdin);
+ return(e);
+ }
/* decode BIG/LIT:user:{cypher}passwordchal:lang:database: line */
@@ -630,10 +788,11 @@
s = strchr(user, ':');
if (s) {
*s = 0;
- stream_set_byteorder(fin->s, strcmp(user, "BIG") == 0);
+ /* we don't use this in merovingian */
+ /* stream_set_byteorder(fin->s, strcmp(user, "BIG") == 0); */
user = s + 1;
} else {
- err e = newErr("client challenge error: %s", buf);
+ e = newErr("client challenge error: %s", buf);
stream_printf(fout, "!incomplete challenge '%s'\n", user);
stream_flush(fout);
stream_close(fout);
@@ -648,7 +807,7 @@
passwd = s + 1;
/* decode algorithm, i.e. {plain}mypasswordchallenge */
if (*passwd != '{') {
- err e = newErr("client challenge error: %s", buf);
+ e = newErr("client challenge error: %s", buf);
stream_printf(fout, "!invalid password entry\n");
stream_flush(fout);
stream_close(fout);
@@ -658,7 +817,7 @@
algo = passwd + 1;
s = strchr(algo, '}');
if (!s) {
- err e = newErr("client challenge error: %s", buf);
+ e = newErr("client challenge error: %s", buf);
stream_printf(fout, "!invalid password entry\n");
stream_flush(fout);
stream_close(fout);
@@ -668,7 +827,7 @@
*s = 0;
passwd = s + 1;
} else {
- err e = newErr("client challenge error: %s", buf);
+ e = newErr("client challenge error: %s", buf);
stream_printf(fout, "!incomplete challenge '%s'\n", user);
stream_flush(fout);
stream_close(fout);
@@ -682,7 +841,7 @@
*s = 0;
lang = s + 1;
} else {
- err e = newErr("client challenge error: %s", buf);
+ e = newErr("client challenge error: %s", buf);
stream_printf(fout, "!incomplete challenge, missing
language\n");
stream_flush(fout);
stream_close(fout);
@@ -710,8 +869,7 @@
stream_close(fdin);
return(newErr("no database specified"));
} else {
- err e = forkMserver(database, &top, 0);
- if (e != NO_ERR) {
+ if ((e = forkMserver(database, &top, 0)) != NO_ERR) {
if (top == NULL) {
stream_printf(fout, "!no such database '%s',
please create it first\n", database);
} else {
@@ -727,7 +885,7 @@
/* if we can't redirect, our mission ends here */
if (stat->conns == NULL || stat->conns->val == NULL) {
- err e = newErr("database '%s' does not allow connections",
stat->dbname);
+ e = newErr("database '%s' does not allow connections",
stat->dbname);
stream_printf(fout, "!database '%s' does not allow
connections\n", stat->dbname);
stream_flush(fout);
stream_close(fout);
@@ -740,29 +898,39 @@
merlog("couldn't get peername of client: %s", strerror(errno));
merlog("redirecting client for database '%s' to %s",
stat->dbname, stat->conns->val);
+ host = "(unknown)";
} else {
/* avoid doing this, it requires some includes that probably
* give trouble on windowz
host = inet_ntoa(saddr.sin_addr);
*/
- char *host = alloca(sizeof(char) * ((3 + 1 + 3 + 1 + 3 + 1 + 3)
+ 1));
- sprintf(host, "%u.%u.%u.%u",
+ host = alloca(sizeof(char) * ((3 + 1 + 3 + 1 + 3 + 1 + 3 + 1 +
5) + 1));
+ sprintf(host, "%u.%u.%u.%u:%d",
(unsigned) ((ntohl(saddr.sin_addr.s_addr) >>
24) & 0xff),
(unsigned) ((ntohl(saddr.sin_addr.s_addr) >>
16) & 0xff),
(unsigned) ((ntohl(saddr.sin_addr.s_addr) >> 8)
& 0xff),
- (unsigned) (ntohl(saddr.sin_addr.s_addr) &
0xff));
- merlog("redirecting client %s:%d for database '%s' to %s",
- host, (int)ntohs(saddr.sin_port),
- stat->dbname, stat->conns->val);
+ (unsigned) (ntohl(saddr.sin_addr.s_addr) &
0xff),
+ (int)ntohs(saddr.sin_port));
+ merlog("redirecting client %s for database '%s' to proxy or %s",
+ host, stat->dbname, stat->conns->val);
}
- /* need to send a redirect */
- stream_printf(fout, "^%s%s\n",
+ /* need to send a redirect, we send two, first a proxy connect,
+ * second an ordinary reconnection to the database */
+ stream_printf(fout, "^mapi:merovingian:proxy\n^%s%s\n",
stat->conns->val, stat->dbname);
- /* flush redirect and return */
+ /* flush redirect */
stream_flush(fout);
- stream_close(fout);
- stream_close(fdin);
+ /* wait for input, or disconnect in a proxy runner */
+ if ((e = startProxy(fdin, fout, stat->conns->val, host)) != NO_ERR) {
+ stream_printf(fout, "!an internal error has occurred, please
try again later\n");
+ stream_flush(fout);
+ stream_close(fout);
+ stream_close(fdin);
+ merlog("starting a proxy failed: %s", e);
+ SABAOTHfreeStatus(&top);
+ return(e);
+ };
SABAOTHfreeStatus(&top);
return(NO_ERR);
@@ -1361,7 +1529,6 @@
e = openConnection(&sock, port);
if (e == NO_ERR) {
pthread_t ctid = 0;
- int comfd = -1;
/* from this point merovingian considers itself to be in
position to
* start running, so flag the parent we will have fun. */
@@ -1388,9 +1555,9 @@
/* handle external connections main loop */
e = acceptConnections(sock);
- /* shut down the control runner too */
- if (comfd > -1)
- close(comfd);
+ /* connect to ourself to avoid a hang when we close, and the
+ * control channel hasn't been used */
+ fclose(fopen(buf, "a"));
if (ctid != 0)
pthread_join(ctid, NULL);
}
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://ad.doubleclick.net/clk;164216239;13503038;w?http://sf.net/marketplace
_______________________________________________
Monetdb-sql-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-sql-checkins