Changeset: ef558e1a0506 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ef558e1a0506
Modified Files:
        common/stream/stream.c
Branch: Jan2014
Log Message:

Propagate errors up, propagate error clearing down, cleanup.

The mnstr_readline function is now defined similarly to the fgets
library function in that it reads at most one fewer bytes than the
size of the buffer, and that the result is null terminated.
Removed socket_readline since it was basically the same as
mnstr_readline itself.  And since this was the only specialized
readline function, removed the readline function pointer from the
stream structure.
Added a clrerr function pointer to the stream structure that is used
to clear errors from layered streams.
Simplified socket_read to not contain a loop.  The upper layers need
to do the loop already, so no sense in doing that at multiple levels.
If socket_read is called with an element size greater than 1, and if
the read/recv returns an incomplete record, we now save that record
for the next call.  (This may be useful when we have a relatively
short socket read timeout.)


diffs (truncated from 507 to 300 lines):

diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -149,9 +149,9 @@ struct stream {
        } stream_data;
        int errnr;
        ssize_t (*read) (stream *s, void *buf, size_t elmsize, size_t cnt);
-       ssize_t (*readline) (stream *s, void *buf, size_t maxcnt);
        ssize_t (*write) (stream *s, const void *buf, size_t elmsize, size_t 
cnt);
        void (*close) (stream *s);
+       void (*clrerr) (stream *s);
        char *(*error) (stream *s);
        void (*destroy) (stream *s);
        int (*flush) (stream *s);
@@ -159,6 +159,11 @@ struct stream {
        int (*fgetpos) (stream *s, lng *p);
        int (*fsetpos) (stream *s, lng p);
        void (*update_timeout) (stream *s);
+       /* in case read() read a non-integral number of elements we
+        * save the last partial element here (only used in
+        * socket_read() */
+       void *buf;
+       size_t len;
 };
 
 int
@@ -198,30 +203,61 @@ mnstr_read(stream *s, void *buf, size_t 
        return (*s->read) (s, buf, elmsize, cnt);
 }
 
-/* Read one line (seperated by \n) of atmost maxcnt characters from
+/* Read one line (seperated by \n) of at most maxcnt-1 characters from
  * the stream.  Returns the number of characters actually read,
- * includes the trailing \n */
+ * includes the trailing \n; terminated by a NULL byte. */
 ssize_t
 mnstr_readline(stream *s, void *buf, size_t maxcnt)
 {
+       char *b = buf, *start = buf;
+
 #ifdef STREAM_DEBUG
        printf("readline %s " SZFMT "\n", s->name ? s->name : "<unnamed>", 
maxcnt);
 #endif
        assert(s->access == ST_READ);
        if (s->errnr)
                return -1;
-       if (!s->readline) {
-               size_t len = 0;
-               char *b = buf, *start = buf;
-               while ((*s->read) (s, start, 1, 1) > 0 && len < maxcnt) {
-                       if (*start++ == '\n')
-                               break;
+       if (maxcnt == 0)
+               return 0;
+       if (maxcnt == 1) {
+               *start = 0;
+               return 0;
+       }
+       for (;;) {
+               switch ((*s->read)(s, start, 1, 1)) {
+               case 1:
+                       /* successfully read a character,
+                        * check whether it is the line
+                        * separator and whether we have space
+                        * left for more */
+                       if (*start++ == '\n' || --maxcnt == 1) {
+                               *start = 0;
+#if 0
+                               if (s->type == ST_ASCII &&
+                                   start[-1] == '\n' &&
+                                   start > b + 1 &&
+                                   start[-2] == '\r') {
+                                       /* convert CR-LF to just LF */
+                                       start[-2] = start[-1];
+                                       start--;
+                               }
+#endif
+                               return (ssize_t) (start - b);
+                       }
+                       break;
+               case -1:
+                       /* error: if we didn't read anything yet,
+                        * return the error, otherwise return what we
+                        * have */
+                       if (start == b)
+                               return -1;
+                       /* fall through */
+               case 0:
+                       /* end of file: return what we have */
+                       *start = 0;
+                       return (ssize_t) (start - b);
                }
-               if (s->errnr)
-                       return -1;
-               return (ssize_t) (start - b);
-       } else
-               return (*s->readline) (s, buf, maxcnt);
+       }
 }
 
 /* Write cnt elements of size elmsize to the stream.  Returns the
@@ -361,8 +397,11 @@ mnstr_errnr(stream *s)
 void
 mnstr_clearerr(stream *s)
 {
-       if (s != NULL)
+       if (s != NULL) {
                s->errnr = MNSTR_NO__ERROR;
+               if (s->clrerr)
+                       (*s->clrerr) (s);
+       }
 }
 
 int
@@ -442,6 +481,8 @@ get_extention(const char *file)
 static void
 destroy(stream *s)
 {
+       if (s->buf)
+               free(s->buf);
        free(s->name);
        free(s);
 }
@@ -481,9 +522,9 @@ create_stream(const char *name)
        s->errnr = MNSTR_NO__ERROR;
        s->stream_data.p = NULL;
        s->read = NULL;
-       s->readline = NULL;
        s->write = NULL;
        s->close = NULL;
+       s->clrerr = NULL;
        s->error = error;
        s->destroy = destroy;
        s->flush = NULL;
@@ -492,6 +533,8 @@ create_stream(const char *name)
        s->fsetpos = NULL;
        s->timeout = 0;
        s->update_timeout = NULL;
+       s->buf = NULL;
+       s->len = 0;
 #ifdef STREAM_DEBUG
        printf("create_stream %s -> " PTRFMT "\n", name ? name : "<unnamed>", 
PTRFMTCAST s);
 #endif
@@ -508,8 +551,9 @@ file_read(stream *s, void *buf, size_t e
        size_t rc = 0;
 
        if (!feof(fp)) {
-               rc = fread(buf, elmsize, cnt, fp);
-               if (ferror(fp)) {
+               if (ferror(fp) ||
+                   ((rc = fread(buf, elmsize, cnt, fp)) == 0 &&
+                    ferror(fp))) {
                        s->errnr = MNSTR_READ_ERROR;
                        return -1;
                }
@@ -549,6 +593,14 @@ file_close(stream *s)
        s->stream_data.p = NULL;
 }
 
+static void
+file_clrerr(stream *s)
+{
+       FILE *fp = (FILE *) s->stream_data.p;
+
+       clearerr(fp);
+}
+
 static int
 file_flush(stream *s)
 {
@@ -645,9 +697,9 @@ open_stream(const char *filename, const 
        if ((fp = fopen(filename, flags)) == NULL)
                s->errnr = MNSTR_OPEN_ERROR;
        s->read = file_read;
-       s->readline = NULL;
        s->write = file_write;
        s->close = file_close;
+       s->clrerr = file_clrerr;
        s->flush = file_flush;
        s->fsync = file_fsync;
        s->fgetpos = file_fgetpos;
@@ -1463,65 +1515,55 @@ socket_write(stream *s, const void *buf,
                if (nr > 0)
                        res += nr;
        }
+       if ((size_t) res >= elmsize)
+               return (ssize_t) (res / elmsize);
        if (nr < 0) {
                s->errnr = MNSTR_WRITE_ERROR;
                return -1;
        }
-       if (res > 0)
-               return (ssize_t) (res / elmsize);
-       s->errnr = MNSTR_WRITE_ERROR;
-       return -1;
+       return 0;
 }
 
 static ssize_t
 socket_read(stream *s, void *buf, size_t elmsize, size_t cnt)
 {
-       ssize_t nr = 0, res = 0, size = (ssize_t) (elmsize * cnt);
-
-       if (!s || s->errnr)
+       ssize_t nr = 0, size = (ssize_t) (elmsize * cnt);
+
+       if (!s || s->errnr || size == 0)
                return -1;
 
-       errno = 0;
-       while (res < size &&
-              (
+       assert((s->buf == NULL) == (s->len == 0));
+       if (s->buf) {
+               assert((size_t) size > s->len);
+               memcpy(buf, s->buf, s->len);
+       }
+
 #ifdef NATIVE_WIN32
-               /* recv works on int, make sure the argument fits */
-               ((nr = recv(s->stream_data.s, (void *) ((char *) buf + res),
-                           (int) min(size - res, 1 << 16), 0)) > 0)
+       if (size > INT_MAX)
+               size = elmsize * (INT_MAX / elmsize);
+       nr = recv(s->stream_data.s, (char *) buf + s->len, (int) (size - 
s->len), 0);
 #else
-               ((nr = read(s->stream_data.s, (void *) ((char *) buf + res),
-                           size - res)) > 0)
+       nr = read(s->stream_data.s, (char *) buf + s->len, size - s->len);
 #endif
-               || (s->timeout == 0
-                   && (errno == EAGAIN || errno == EWOULDBLOCK))
-               || errno == EINTR)
-               ) {
-               errno = 0;
-               if (nr > 0)
-                       res += nr;
-       }
-       if (nr < 0) {
+       if (nr == -1) {
                s->errnr = MNSTR_READ_ERROR;
                return -1;
        }
-       return (ssize_t) (res / elmsize);
-}
-
-/* Read one line (seperated by \n) of at most maxcnt characters from the
- * stream. Returns the number of characters actually read, includes the
- * trailing \n. */
-static ssize_t
-socket_readline(stream *s, void *buf, size_t maxcnt)
-{
-       char *b = buf, *start = buf, *end = start + maxcnt;
-
-       while (socket_read(s, start, 1, 1) > 0 && start < end) {
-               if (*start++ == '\n')
-                       break;
+       if (nr == 0)
+               return 0;       /* end of file */
+       if (s->buf) {
+               nr += s->len;
+               free(s->buf);
+               s->buf = NULL;
+               s->len = 0;
        }
-       if (s->errnr)
-               return -1;
-       return (ssize_t) (start - b);
+       if (elmsize > 1 && (cnt = nr % elmsize) != 0) {
+               s->buf = malloc(cnt);
+               memcpy(s->buf, (char *) buf + nr - cnt, cnt);
+               s->len = cnt;
+               nr -= cnt;
+       }
+       return (ssize_t) (nr / elmsize);
 }
 
 static void
@@ -1547,6 +1589,10 @@ socket_close(stream *s)
                }
        }
        s->stream_data.s = INVALID_SOCKET;
+       if (s->buf)
+               free(s->buf);
+       s->buf = NULL;
+       s->len = 0;
 }
 
 static void
@@ -1571,10 +1617,8 @@ socket_open(SOCKET sock, const char *nam
        if ((s = create_stream(name)) == NULL)
                return NULL;
        s->read = socket_read;
-       s->readline = socket_readline;
        s->write = socket_write;
        s->close = socket_close;
-       s->flush = NULL;
        s->stream_data.s = sock;
        s->update_timeout = socket_update_timeout;
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to