Thank you for advice. library(snow) cl <- makeSOCKcluster(c("localhost","localhost")) clusterCall(cl, function(x){Sys.sleep(10);1}) clusterCall(cl, function(x){Sys.sleep(60);1}) # Timeout Code. stopCluster(cl)
I worked in AIX and Linux. diff -ruN R-devel.orig/src/include/Rinternals.h R-devel/src/include/Rinternals.h --- R-devel.orig/src/include/Rinternals.h 2007-06-18 00:50:00.000000000 +0900 +++ R-devel/src/include/Rinternals.h 2007-06-25 13:08:00.000000000 +0900 @@ -725,7 +725,7 @@ R_pstream_format_t type; int version; void (*OutChar)(R_outpstream_t, int); - void (*OutBytes)(R_outpstream_t, void *, int); + void (*OutBytes)(R_outpstream_t, void *, size_t); SEXP (*OutPersistHookFunc)(SEXP, SEXP); SEXP OutPersistHookData; }; @@ -735,7 +735,7 @@ R_pstream_data_t data; R_pstream_format_t type; int (*InChar)(R_inpstream_t); - void (*InBytes)(R_inpstream_t, void *, int); + void (*InBytes)(R_inpstream_t, void *, size_t); SEXP (*InPersistHookFunc)(SEXP, SEXP); SEXP InPersistHookData; }; @@ -743,12 +743,12 @@ void R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data, R_pstream_format_t type, int (*inchar)(R_inpstream_t), - void (*inbytes)(R_inpstream_t, void *, int), + void (*inbytes)(R_inpstream_t, void *, size_t), SEXP (*phook)(SEXP, SEXP), SEXP pdata); void R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data, R_pstream_format_t type, int version, void (*outchar)(R_outpstream_t, int), - void (*outbytes)(R_outpstream_t, void *, int), + void (*outbytes)(R_outpstream_t, void *, size_t), SEXP (*phook)(SEXP, SEXP), SEXP pdata); void R_InitFileInPStream(R_inpstream_t stream, FILE *fp, diff -ruN R-devel.orig/src/main/serialize.c R-devel/src/main/serialize.c --- R-devel.orig/src/main/serialize.c 2007-06-18 00:50:02.000000000 +0900 +++ R-devel/src/main/serialize.c 2007-06-25 19:32:05.000000000 +0900 @@ -1529,7 +1529,7 @@ R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data, R_pstream_format_t type, int (*inchar)(R_inpstream_t), - void (*inbytes)(R_inpstream_t, void *, int), + void (*inbytes)(R_inpstream_t, void *, size_t), SEXP (*phook)(SEXP, SEXP), SEXP pdata) { stream->data = data; @@ -1544,7 +1544,7 @@ R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data, R_pstream_format_t type, int version, void (*outchar)(R_outpstream_t, int), - void (*outbytes)(R_outpstream_t, void *, int), + void (*outbytes)(R_outpstream_t, void *, size_t), SEXP (*phook)(SEXP, SEXP), SEXP pdata) { stream->data = data; @@ -1574,14 +1574,14 @@ return fgetc(fp); } -static void OutBytesFile(R_outpstream_t stream, void *buf, int length) +static void OutBytesFile(R_outpstream_t stream, void *buf, size_t length) { FILE *fp = stream->data; size_t out = fwrite(buf, 1, length, fp); if (out != length) error(_("write failed")); } -static void InBytesFile(R_inpstream_t stream, void *buf, int length) +static void InBytesFile(R_inpstream_t stream, void *buf, size_t length) { FILE *fp = stream->data; size_t in = fread(buf, 1, length, fp); @@ -1629,12 +1629,12 @@ error(_("cannot write to this connection")); } -static void InBytesConn(R_inpstream_t stream, void *buf, int length) +static void InBytesConn(R_inpstream_t stream, void *buf, size_t length) { Rconnection con = (Rconnection) stream->data; CheckInConn(con); if (con->text) { - int i; + size_t i; char *p = buf; for (i = 0; i < length; i++) p[i] = Rconn_fgetc(con); @@ -1659,7 +1659,7 @@ } } -static void OutBytesConn(R_outpstream_t stream, void *buf, int length) +static void OutBytesConn(R_outpstream_t stream, void *buf, size_t length) { Rconnection con = (Rconnection) stream->data; CheckOutConn(con); @@ -1817,7 +1817,7 @@ bb->buf[bb->count++] = c; } -static void OutBytesBB(R_outpstream_t stream, void *buf, int length) +static void OutBytesBB(R_outpstream_t stream, void *buf, size_t length) { bconbuf_t bb = stream->data; if (bb->count + length > BCONBUFSIZ) @@ -1863,14 +1863,14 @@ */ typedef struct membuf_st { - int size; - int count; + size_t size; + size_t count; unsigned char *buf; } *membuf_t; -static void resize_buffer(membuf_t mb, int needed) +static void resize_buffer(membuf_t mb, size_t needed) { - int newsize = 2 * needed; + size_t newsize = 2 * needed; mb->buf = realloc(mb->buf, newsize); if (mb->buf == NULL) error(_("cannot allocate buffer")); @@ -1885,7 +1885,7 @@ mb->buf[mb->count++] = c; } -static void OutBytesMem(R_outpstream_t stream, void *buf, int length) +static void OutBytesMem(R_outpstream_t stream, void *buf, size_t length) { membuf_t mb = stream->data; if (mb->count + length > mb->size) @@ -1902,7 +1902,7 @@ return mb->buf[mb->count++]; } -static void InBytesMem(R_inpstream_t stream, void *buf, int length) +static void InBytesMem(R_inpstream_t stream, void *buf, size_t length) { membuf_t mb = stream->data; if (mb->count + length > mb->size) @@ -1912,7 +1912,7 @@ } static void InitMemInPStream(R_inpstream_t stream, membuf_t mb, - void *buf, int length, + void *buf, size_t length, SEXP (*phook)(SEXP, SEXP), SEXP pdata) { mb->count = 0; diff -ruN R-devel.orig/src/modules/internet/sockconn.c R-devel/src/modules/internet/sockconn.c --- R-devel.orig/src/modules/internet/sockconn.c 2007-06-03 00:50:32.000000000 +0900 +++ R-devel/src/modules/internet/sockconn.c 2007-06-25 19:41:32.000000000 +0900 @@ -155,14 +155,19 @@ static size_t sock_read(void *ptr, size_t size, size_t nitems, Rconnection con) { + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv)); + + R_SockTimeout(timeout); return sock_read_helper(con, ptr, size * nitems)/size; } static size_t sock_write(const void *ptr, size_t size, size_t nitems, Rconnection con) { + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv)); Rsockconn this = (Rsockconn)con->private; + R_SockTimeout(timeout); return R_SockWrite(this->fd, ptr, size * nitems)/size; } 2007/6/25, Prof Brian Ripley <[EMAIL PROTECTED]>: > On Mon, 25 Jun 2007, Ei-ji Nakama wrote: > > > problem of the very large memory require by the Sign extension. > > > > --- R-2.5.0.orig/src/main/serialize.c 2007-03-27 01:42:08.000000000 +0900 > > +++ R-2.5.0/src/main/serialize.c 2007-06-25 00:48:58.000000000 +0900 > > @@ -1866,7 +1866,7 @@ > > > > static void resize_buffer(membuf_t mb, int needed) > > { > > - int newsize = 2 * needed; > > + size_t newsize = 2 * needed; > > mb->buf = realloc(mb->buf, newsize); > > if (mb->buf == NULL) > > error(_("cannot allocate buffer")); > > Yes, thanks, but the structure also needs to be changed as the next line > is > > mb->size = newsize; > > and so this would set mb->size to a negative value. > > Could you please tell us where you encountered this? > > As far as I can see, this code is only used via R_serialize for > serializing to a raw vector, in which case 'size' can safely be 'int' and > the re-allocation should be up to 2^31-1 bytes at most (and allocating > twice what you are asked for seems undesirable). But there is potential > overflow at > > if (mb->count + length > mb->size) > > and possibly elsewhere. > > > > The time-out of read and write was not set. > > > > 51:sendData.SOCKnode <- function(node, data) { > > 52: timeout <- getClusterOption("timeout") > > 53: old <- options(timeout = timeout); > > 54: on.exit(options(old)) > > 55: serialize(data, node$con) > > 56: } > > 57: > > 58:recvData.SOCKnode <- function(node) { > > 59: timeout <- getClusterOption("timeout") > > 60: old <- options(timeout = timeout); > > 61: on.exit(options(old)) > > 62: unserialize(node$con) > > 63: } > > I don't think sock_read/sock_write is the right place to make that > setting. Ideally it would be set when the option is set, but as this is > in a module that needs an extension to the interface. > > Looking at the code, we read from a socket in blocks of 4096, but we write > in a single block. The latter is likely to be the problem, and I think > some redesigning is necessary here. > > Perhaps you and Luke Tierney can comment on exactly what the problem is > and how best to work around it. > > > > > --- R-2.5.0.orig/src/modules/internet/sockconn.c 2006-09-04 > > 23:20:59.000000000 +0900 > > +++ R-2.5.0/src/modules/internet/sockconn.c 2007-06-25 > > 00:51:38.000000000 +0900 > > @@ -155,14 +155,19 @@ > > static size_t sock_read(void *ptr, size_t size, size_t nitems, > > Rconnection con) > > { > > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv)); > > + > > + R_SockTimeout(timeout); > > return sock_read_helper(con, ptr, size * nitems)/size; > > } > > > > static size_t sock_write(const void *ptr, size_t size, size_t nitems, > > Rconnection con) > > { > > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv)); > > Rsockconn this = (Rsockconn)con->private; > > > > + R_SockTimeout(timeout); > > return R_SockWrite(this->fd, ptr, size * nitems)/size; > > } > > > > > > -- > Brian D. Ripley, [EMAIL PROTECTED] > Professor of Applied Statistics, http://www.stats.ox.ac.uk/~ripley/ > University of Oxford, Tel: +44 1865 272861 (self) > 1 South Parks Road, +44 1865 272866 (PA) > Oxford OX1 3TG, UK Fax: +44 1865 272595 > > > -- EI-JI Nakama <[EMAIL PROTECTED]> "\u4e2d\u9593\u6804\u6cbb" <[EMAIL PROTECTED]> ______________________________________________ R-devel@r-project.org mailing list https://stat.ethz.ch/mailman/listinfo/r-devel