Changeset: ef558e1a0506 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ef558e1a0506 Modified Files: common/stream/stream.c Branch: Jan2014 Log Message:
Propagate errors up, propagate error clearing down, cleanup. The mnstr_readline function is now defined similarly to the fgets library function in that it reads at most one fewer bytes than the size of the buffer, and that the result is null terminated. Removed socket_readline since it was basically the same as mnstr_readline itself. And since this was the only specialized readline function, removed the readline function pointer from the stream structure. Added a clrerr function pointer to the stream structure that is used to clear errors from layered streams. Simplified socket_read to not contain a loop. The upper layers need to do the loop already, so no sense in doing that at multiple levels. If socket_read is called with an element size greater than 1, and if the read/recv returns an incomplete record, we now save that record for the next call. (This may be useful when we have a relatively short socket read timeout.) diffs (truncated from 507 to 300 lines): diff --git a/common/stream/stream.c b/common/stream/stream.c --- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -149,9 +149,9 @@ struct stream { } stream_data; int errnr; ssize_t (*read) (stream *s, void *buf, size_t elmsize, size_t cnt); - ssize_t (*readline) (stream *s, void *buf, size_t maxcnt); ssize_t (*write) (stream *s, const void *buf, size_t elmsize, size_t cnt); void (*close) (stream *s); + void (*clrerr) (stream *s); char *(*error) (stream *s); void (*destroy) (stream *s); int (*flush) (stream *s); @@ -159,6 +159,11 @@ struct stream { int (*fgetpos) (stream *s, lng *p); int (*fsetpos) (stream *s, lng p); void (*update_timeout) (stream *s); + /* in case read() read a non-integral number of elements we + * save the last partial element here (only used in + * socket_read() */ + void *buf; + size_t len; }; int @@ -198,30 +203,61 @@ mnstr_read(stream *s, void *buf, size_t return (*s->read) (s, buf, elmsize, cnt); } -/* Read one line (seperated by \n) of atmost maxcnt characters from +/* Read one line (seperated by \n) of at most maxcnt-1 characters from * the stream. Returns the number of characters actually read, - * includes the trailing \n */ + * includes the trailing \n; terminated by a NULL byte. */ ssize_t mnstr_readline(stream *s, void *buf, size_t maxcnt) { + char *b = buf, *start = buf; + #ifdef STREAM_DEBUG printf("readline %s " SZFMT "\n", s->name ? s->name : "<unnamed>", maxcnt); #endif assert(s->access == ST_READ); if (s->errnr) return -1; - if (!s->readline) { - size_t len = 0; - char *b = buf, *start = buf; - while ((*s->read) (s, start, 1, 1) > 0 && len < maxcnt) { - if (*start++ == '\n') - break; + if (maxcnt == 0) + return 0; + if (maxcnt == 1) { + *start = 0; + return 0; + } + for (;;) { + switch ((*s->read)(s, start, 1, 1)) { + case 1: + /* successfully read a character, + * check whether it is the line + * separator and whether we have space + * left for more */ + if (*start++ == '\n' || --maxcnt == 1) { + *start = 0; +#if 0 + if (s->type == ST_ASCII && + start[-1] == '\n' && + start > b + 1 && + start[-2] == '\r') { + /* convert CR-LF to just LF */ + start[-2] = start[-1]; + start--; + } +#endif + return (ssize_t) (start - b); + } + break; + case -1: + /* error: if we didn't read anything yet, + * return the error, otherwise return what we + * have */ + if (start == b) + return -1; + /* fall through */ + case 0: + /* end of file: return what we have */ + *start = 0; + return (ssize_t) (start - b); } - if (s->errnr) - return -1; - return (ssize_t) (start - b); - } else - return (*s->readline) (s, buf, maxcnt); + } } /* Write cnt elements of size elmsize to the stream. Returns the @@ -361,8 +397,11 @@ mnstr_errnr(stream *s) void mnstr_clearerr(stream *s) { - if (s != NULL) + if (s != NULL) { s->errnr = MNSTR_NO__ERROR; + if (s->clrerr) + (*s->clrerr) (s); + } } int @@ -442,6 +481,8 @@ get_extention(const char *file) static void destroy(stream *s) { + if (s->buf) + free(s->buf); free(s->name); free(s); } @@ -481,9 +522,9 @@ create_stream(const char *name) s->errnr = MNSTR_NO__ERROR; s->stream_data.p = NULL; s->read = NULL; - s->readline = NULL; s->write = NULL; s->close = NULL; + s->clrerr = NULL; s->error = error; s->destroy = destroy; s->flush = NULL; @@ -492,6 +533,8 @@ create_stream(const char *name) s->fsetpos = NULL; s->timeout = 0; s->update_timeout = NULL; + s->buf = NULL; + s->len = 0; #ifdef STREAM_DEBUG printf("create_stream %s -> " PTRFMT "\n", name ? name : "<unnamed>", PTRFMTCAST s); #endif @@ -508,8 +551,9 @@ file_read(stream *s, void *buf, size_t e size_t rc = 0; if (!feof(fp)) { - rc = fread(buf, elmsize, cnt, fp); - if (ferror(fp)) { + if (ferror(fp) || + ((rc = fread(buf, elmsize, cnt, fp)) == 0 && + ferror(fp))) { s->errnr = MNSTR_READ_ERROR; return -1; } @@ -549,6 +593,14 @@ file_close(stream *s) s->stream_data.p = NULL; } +static void +file_clrerr(stream *s) +{ + FILE *fp = (FILE *) s->stream_data.p; + + clearerr(fp); +} + static int file_flush(stream *s) { @@ -645,9 +697,9 @@ open_stream(const char *filename, const if ((fp = fopen(filename, flags)) == NULL) s->errnr = MNSTR_OPEN_ERROR; s->read = file_read; - s->readline = NULL; s->write = file_write; s->close = file_close; + s->clrerr = file_clrerr; s->flush = file_flush; s->fsync = file_fsync; s->fgetpos = file_fgetpos; @@ -1463,65 +1515,55 @@ socket_write(stream *s, const void *buf, if (nr > 0) res += nr; } + if ((size_t) res >= elmsize) + return (ssize_t) (res / elmsize); if (nr < 0) { s->errnr = MNSTR_WRITE_ERROR; return -1; } - if (res > 0) - return (ssize_t) (res / elmsize); - s->errnr = MNSTR_WRITE_ERROR; - return -1; + return 0; } static ssize_t socket_read(stream *s, void *buf, size_t elmsize, size_t cnt) { - ssize_t nr = 0, res = 0, size = (ssize_t) (elmsize * cnt); - - if (!s || s->errnr) + ssize_t nr = 0, size = (ssize_t) (elmsize * cnt); + + if (!s || s->errnr || size == 0) return -1; - errno = 0; - while (res < size && - ( + assert((s->buf == NULL) == (s->len == 0)); + if (s->buf) { + assert((size_t) size > s->len); + memcpy(buf, s->buf, s->len); + } + #ifdef NATIVE_WIN32 - /* recv works on int, make sure the argument fits */ - ((nr = recv(s->stream_data.s, (void *) ((char *) buf + res), - (int) min(size - res, 1 << 16), 0)) > 0) + if (size > INT_MAX) + size = elmsize * (INT_MAX / elmsize); + nr = recv(s->stream_data.s, (char *) buf + s->len, (int) (size - s->len), 0); #else - ((nr = read(s->stream_data.s, (void *) ((char *) buf + res), - size - res)) > 0) + nr = read(s->stream_data.s, (char *) buf + s->len, size - s->len); #endif - || (s->timeout == 0 - && (errno == EAGAIN || errno == EWOULDBLOCK)) - || errno == EINTR) - ) { - errno = 0; - if (nr > 0) - res += nr; - } - if (nr < 0) { + if (nr == -1) { s->errnr = MNSTR_READ_ERROR; return -1; } - return (ssize_t) (res / elmsize); -} - -/* Read one line (seperated by \n) of at most maxcnt characters from the - * stream. Returns the number of characters actually read, includes the - * trailing \n. */ -static ssize_t -socket_readline(stream *s, void *buf, size_t maxcnt) -{ - char *b = buf, *start = buf, *end = start + maxcnt; - - while (socket_read(s, start, 1, 1) > 0 && start < end) { - if (*start++ == '\n') - break; + if (nr == 0) + return 0; /* end of file */ + if (s->buf) { + nr += s->len; + free(s->buf); + s->buf = NULL; + s->len = 0; } - if (s->errnr) - return -1; - return (ssize_t) (start - b); + if (elmsize > 1 && (cnt = nr % elmsize) != 0) { + s->buf = malloc(cnt); + memcpy(s->buf, (char *) buf + nr - cnt, cnt); + s->len = cnt; + nr -= cnt; + } + return (ssize_t) (nr / elmsize); } static void @@ -1547,6 +1589,10 @@ socket_close(stream *s) } } s->stream_data.s = INVALID_SOCKET; + if (s->buf) + free(s->buf); + s->buf = NULL; + s->len = 0; } static void @@ -1571,10 +1617,8 @@ socket_open(SOCKET sock, const char *nam if ((s = create_stream(name)) == NULL) return NULL; s->read = socket_read; - s->readline = socket_readline; s->write = socket_write; s->close = socket_close; - s->flush = NULL; s->stream_data.s = sock; s->update_timeout = socket_update_timeout; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list