Index: libs/std/socket.c
===================================================================
RCS file: /cvsroot/neko/libs/std/socket.c,v
retrieving revision 1.64
diff -p -u -r1.64 socket.c
--- libs/std/socket.c	11 Aug 2010 09:07:43 -0000	1.64
+++ libs/std/socket.c	30 Sep 2010 13:08:16 -0000
@@ -29,6 +29,7 @@
 #	include <sys/types.h>
 #	include <sys/socket.h>
 #	include <sys/time.h>
+#	include <sys/ioctl.h>
 #	include <netinet/in.h>
 #	include <netinet/tcp.h>
 #	include <arpa/inet.h>
@@ -55,6 +56,7 @@ typedef struct {
 	char *buf;
 	int size;
 	int ret;
+	int flags;
 } sock_tmp;
 
 typedef struct {
@@ -62,15 +64,19 @@ typedef struct {
 #	ifdef NEKO_WINDOWS
 	struct fd_set *fdr;
 	struct fd_set *fdw;
+	struct fd_set *fde;
 	struct fd_set *outr;
 	struct fd_set *outw;
+	struct fd_set *oute;
 #	else
 	struct pollfd *fds;
 	int rcount;
 	int wcount;
+	int ecount;
 #	endif
 	value ridx;
 	value widx;
+	value eidx;
 } polldata;
 
 DEFINE_KIND(k_socket);
@@ -206,7 +212,7 @@ static value socket_send( value o, value
 
 static void tmp_recv( void *_t ) {
 	sock_tmp *t = (sock_tmp*)_t;
-	t->ret = recv(t->sock,t->buf,t->size,MSG_NOSIGNAL);
+	t->ret = recv(t->sock,t->buf,t->size,t->flags);
 }
 
 /**
@@ -232,6 +238,7 @@ static value socket_recv( value o, value
 		t.sock = val_sock(o);
 		t.buf = val_string(data) + p;
 		t.size = l;
+		t.flags = MSG_NOSIGNAL;
 		neko_thread_blocking(tmp_recv,&t);
 		ret = t.ret;
 	} else
@@ -258,6 +265,7 @@ static value socket_recv_char( value o )
 		t.sock = val_sock(o);
 		t.buf = (char*)&cc;
 		t.size = 1;
+		t.flags = MSG_NOSIGNAL;
 		neko_thread_blocking(tmp_recv,&t);
 		ret = t.ret;
 	} else
@@ -670,40 +678,47 @@ static value socket_poll_alloc( value ns
 	{
 		p->fdr = (fd_set*)alloc_private(FDSIZE(p->max));
 		p->fdw = (fd_set*)alloc_private(FDSIZE(p->max));
+		p->fde = (fd_set*)alloc_private(FDSIZE(p->max));
 		p->outr = (fd_set*)alloc_private(FDSIZE(p->max));
 		p->outw = (fd_set*)alloc_private(FDSIZE(p->max));
+		p->oute = (fd_set*)alloc_private(FDSIZE(p->max));
 		p->fdr->fd_count = 0;
 		p->fdw->fd_count = 0;
+		p->fde->fd_count = 0;
 	}
 #	else
 	p->fds = (struct pollfd*)alloc_private(sizeof(struct pollfd) * p->max);
 	p->rcount = 0;
 	p->wcount = 0;
+	p->ecount = 0;
 #	endif
 	p->ridx = alloc_array(p->max+1);
 	p->widx = alloc_array(p->max+1);
+	p->eidx = alloc_array(p->max+1);
 	for(i=0;i<=p->max;i++) {
 		val_array_ptr(p->ridx)[i] = alloc_int(-1);
 		val_array_ptr(p->widx)[i] = alloc_int(-1);
+		val_array_ptr(p->eidx)[i] = alloc_int(-1);
 	}
 	return alloc_abstract(k_poll, p);
 }
 
 /**
-	socket_poll_prepare : 'poll -> read:'socket array -> write:'socket array -> int array array
+	socket_poll_prepare : 'poll -> read:'socket array -> write:'socket array -> other:'socket array -> int array array
 	<doc>
 	Prepare a poll for scanning events on sets of sockets.
 	</doc>
 **/
-static value socket_poll_prepare( value pdata, value rsocks, value wsocks ) {
+static value socket_poll_prepare( value pdata, value rsocks, value wsocks, value esocks ) {
 	polldata *p;
 	int i,len;
 	val_check(rsocks,array);
 	val_check(wsocks,array);
+	val_check(esocks,array);
 	val_check_kind(pdata,k_poll);
 	p = val_poll(pdata);
 	len = val_array_size(rsocks);
-	if( len + val_array_size(wsocks) > p->max )
+	if( len + val_array_size(wsocks) + val_array_size(esocks) > p->max )
 		val_throw(alloc_string("Too many sockets in poll"));
 #	ifdef NEKO_WINDOWS
 	for(i=0;i<len;i++) {
@@ -719,6 +734,13 @@ static value socket_poll_prepare( value 
 		p->fdw->fd_array[i] = val_sock(s);
 	}
 	p->fdw->fd_count = len;
+	len = val_array_size(esocks);
+	for(i=0;i<len;i++) {
+		value s = val_array_ptr(esocks)[i];
+		val_check_kind(s,k_socket);
+		p->fde->fd_array[i] = val_sock(s);
+	}
+	p->fde->fd_count = len;
 #	else
 	for(i=0;i<len;i++) {
 		value s = val_array_ptr(rsocks)[i];
@@ -738,11 +760,22 @@ static value socket_poll_prepare( value 
 		p->fds[k].revents = 0;
 	}
 	p->wcount = len;
+	len = val_array_size(esocks);
+	for(i=0;i<len;i++) {
+		int k = i + p->rcount + p->wcount;
+		value s = val_array_ptr(esocks)[i];
+		val_check_kind(s,k_socket);
+		p->fds[k].fd = val_sock(s);
+		p->fds[k].events = POLLPRI;
+		p->fds[k].revents = 0;
+	}
+	p->ecount = len;
 #	endif
 	{
-		value a = alloc_array(2);
+		value a = alloc_array(3);
 		val_array_ptr(a)[0] = p->ridx;
 		val_array_ptr(a)[1] = p->widx;
+		val_array_ptr(a)[2] = p->eidx;
 		return a;
 	}
 }
@@ -750,7 +783,7 @@ static value socket_poll_prepare( value 
 /**
 	socket_poll_events : 'poll -> timeout:float -> void
 	<doc>
-	Update the read/write flags arrays that were created with [socket_poll_prepare].
+	Update the read/write/other flags arrays that were created with [socket_poll_prepare].
 	</doc>
 **/
 static value socket_poll_events( value pdata, value timeout ) {
@@ -763,9 +796,10 @@ static value socket_poll_events( value p
 	p = val_poll(pdata);
 	memcpy(p->outr,p->fdr,FDSIZE(p->fdr->fd_count));
 	memcpy(p->outw,p->fdw,FDSIZE(p->fdw->fd_count));
+	memcpy(p->oute,p->fde,FDSIZE(p->fde->fd_count));
 	val_check(timeout,number);
 	init_timeval(val_number(timeout),&t);
-	if( p->fdr->fd_count + p->fdw->fd_count != 0 && select(0,p->outr,p->outw,NULL,&t) == SOCKET_ERROR )
+	if( p->fdr->fd_count + p->fdw->fd_count != 0 && select(0,p->outr,p->outw,p->oute,&t) == SOCKET_ERROR )
 		neko_error();
 	k = 0;
 	for(i=0;i<p->fdr->fd_count;i++)
@@ -777,13 +811,18 @@ static value socket_poll_events( value p
 		if( FD_ISSET(p->fdw->fd_array[i],p->outw) )
 			val_array_ptr(p->widx)[k++] = alloc_int(i);
 	val_array_ptr(p->widx)[k] = alloc_int(-1);
+	k = 0;
+	for(i=0;i<p->fde->fd_count;i++)
+		if( FD_ISSET(p->fde->fd_array[i],p->oute) )
+			val_array_ptr(p->eidx)[k++] = alloc_int(i);
+	val_array_ptr(p->eidx)[k] = alloc_int(-1);
 #else
 	int i,k;
 	int tot;
 	val_check_kind(pdata,k_poll);
 	val_check(timeout,number);
 	p = val_poll(pdata);
-	tot = p->rcount + p->wcount;
+	tot = p->rcount + p->wcount + p->ecount;
 	POSIX_LABEL(poll_events_again);
 	if( poll(p->fds,tot,(int)(val_number(timeout) * 1000)) < 0 ) {
 		HANDLE_EINTR(poll_events_again);
@@ -795,10 +834,15 @@ static value socket_poll_events( value p
 			val_array_ptr(p->ridx)[k++] = alloc_int(i);
 	val_array_ptr(p->ridx)[k] = alloc_int(-1);
 	k = 0;
-	for(;i<tot;i++)
+	for(;i<p->rcount+p->wcount;i++)
 		if( p->fds[i].revents & (POLLOUT|POLLHUP) )
 			val_array_ptr(p->widx)[k++] = alloc_int(i - p->rcount);
 	val_array_ptr(p->widx)[k] = alloc_int(-1);
+	k = 0;
+	for(;i<tot;i++)
+		if( p->fds[i].revents & (POLLPRI|POLLHUP) )
+			val_array_ptr(p->eidx)[k++] = alloc_int(i - p->rcount - p->wcount);
+	val_array_ptr(p->eidx)[k] = alloc_int(-1);
 #endif
 	return val_null;
 }
@@ -815,7 +859,7 @@ static value socket_poll( value socks, v
 	polldata *p;
 	value a;
 	int i, rcount = 0;
-	if( socket_poll_prepare(pdata,socks,alloc_array(0)) == NULL )
+	if( socket_poll_prepare(pdata,socks,alloc_array(0),alloc_array(0)) == NULL )
 		neko_error();
 	if( socket_poll_events(pdata,timeout) == NULL )
 		neko_error();
@@ -844,6 +888,109 @@ static value socket_set_fast_send( value
 	return val_null;
 }
 
+
+/**
+	socket_send_urgent : 'socket -> int -> void
+	<doc>Send the urgent byte over a connected TCP socket.  Must be in the range 0..255</doc>
+**/
+static value socket_send_urgent( value o, value v ) {
+	int c;
+	unsigned char cc;
+	val_check_kind(o,k_socket);
+	val_check(v,int);
+	c = val_int(v);
+	if( c < 0 || c > 255 )
+		neko_error();
+	cc = (unsigned char)c;
+	POSIX_LABEL(send_urgent_again);
+	if( send(val_sock(o),&cc,1,MSG_NOSIGNAL|MSG_OOB) == SOCKET_ERROR ) {
+		HANDLE_EINTR(send_urgent_again);
+		return block_error();
+	}
+	return val_true;
+}
+
+/**
+	socket_recv_urgent : 'socket -> int
+	<doc>Read the urgent byte from a connected TCP socket.</doc>
+**/
+static value socket_recv_urgent( value o ) {
+	int ret;
+	int retry = 0;
+	unsigned char cc;
+	val_check_kind(o,k_socket);
+	POSIX_LABEL(recv_urgent_again);
+	if( retry++ > NRETRYS ) {
+		sock_tmp t;
+		t.sock = val_sock(o);
+		t.buf = (char*)&cc;
+		t.size = 1;
+		t.flags = MSG_NOSIGNAL|MSG_OOB;
+		neko_thread_blocking(tmp_recv,&t);
+		ret = t.ret;
+	} else
+		ret = recv(val_sock(o),&cc,1,MSG_NOSIGNAL|MSG_OOB);
+	if( ret == SOCKET_ERROR ) {
+		HANDLE_EINTR(recv_urgent_again);
+		return block_error();
+	}
+	if( ret == 0 )
+		neko_error();
+	return alloc_int(cc);
+}
+
+/**
+	socket_fast_forward : 'socket -> string
+	<doc>Read the data available from a TCP socket prior to urgent data.</doc>
+**/
+static value socket_fast_forward( value o ) {
+	buffer b;
+	char buf[256];
+	int len;
+	val_check_kind(o,k_socket);
+	b = alloc_buffer(NULL);
+	while( true ) {
+#		ifdef NEKO_WINDOWS
+		unsigned long atmark;
+		if( ioctlsocket(val_sock(o),SIOCATMARK,&atmark) != 0 )
+			neko_error();
+#		else
+		int atmark;
+		if( ioctl(val_sock(o),SIOCATMARK,&atmark) != 0 )
+			neko_error();
+#		endif
+		if( atmark != 0 )
+			break;
+		POSIX_LABEL(read_again);
+		len = recv(val_sock(o),buf,256,MSG_NOSIGNAL);
+		if( len == SOCKET_ERROR ) {
+			HANDLE_EINTR(read_again);
+			return block_error();
+		}
+		if( len == 0 )
+			break;
+		buffer_append_sub(b,buf,len);
+	}
+	return buffer_to_string(b);
+}
+
+/**
+	socket_set_urgent_inline : 'socket -> bool -> void
+	<doc>
+	Disable or enable the SO_OOBINLINE flag for the socket
+	</doc>
+**/
+static value socket_set_urgent_inline( value s, value f ) {
+	int oobinline;
+	val_check_kind(s,k_socket);
+	val_check(f,bool);
+	oobinline = val_bool(f);
+	if( setsockopt(val_sock(s),SOL_SOCKET,SO_OOBINLINE,(char*)&oobinline,sizeof(oobinline)) )
+		neko_error();
+	return val_null;
+}
+
+
 DEFINE_PRIM(socket_init,0);
 DEFINE_PRIM(socket_new,1);
 DEFINE_PRIM(socket_send,4);
@@ -867,9 +1014,14 @@ DEFINE_PRIM(socket_set_fast_send,2);
 
 DEFINE_PRIM(socket_poll_alloc,1);
 DEFINE_PRIM(socket_poll,3);
-DEFINE_PRIM(socket_poll_prepare,3);
+DEFINE_PRIM(socket_poll_prepare,4);
 DEFINE_PRIM(socket_poll_events,2);
 
+DEFINE_PRIM(socket_send_urgent,2);
+DEFINE_PRIM(socket_recv_urgent,1);
+DEFINE_PRIM(socket_fast_forward,1);
+DEFINE_PRIM(socket_set_urgent_inline,2);
+
 DEFINE_PRIM(host_local,0);
 DEFINE_PRIM(host_resolve,1);
 DEFINE_PRIM(host_to_string,1);
