Changeset: c98fbdc33cf5 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c98fbdc33cf5
Modified Files:
        common/stream/xz_stream.c
Branch: makelibstreamgreatagain
Log Message:

Rewrite xz_stream.c to be less wrong

The existing code did not take into account that multiple
invocations of lzma_code() might be necessary to process
all input.


diffs (183 lines):

diff --git a/common/stream/xz_stream.c b/common/stream/xz_stream.c
--- a/common/stream/xz_stream.c
+++ b/common/stream/xz_stream.c
@@ -22,6 +22,75 @@ typedef struct xz_stream {
        uint8_t buf[XZBUFSIZ];
 } xz_stream;
 
+/* Keep calling lzma_code until the whole input buffer has been consumed
+ * and all necessary output has been written.
+ *
+ * If action is LZMA_RUN, iteration ends when lzma_code() returns LZMA_OK
+ * while the output buffer is not empty. The output buffers is not flushed
+ * because we expect further invocations.
+ *
+ * If action is something else, for example LZMA_FINISH, iteration ends
+ * when lzma_code() returns LZMA_STREAM_END and the out buffer is flushed
+ * afterward.
+ *
+ * Returns > 0 on succes, 0 on error.
+ */
+static int
+pump_out(xz_stream *xz, lzma_action action)
+{
+       while (1) {
+               // Make sure there is room in the output buffer
+               if (xz->strm.avail_out == 0) {
+                       size_t nwritten = fwrite(xz->buf, 1, XZBUFSIZ, xz->fp);
+                       if (nwritten != XZBUFSIZ) {
+                               return 0;
+                       }
+                       xz->strm.next_out = xz->buf;
+                       xz->strm.avail_out = XZBUFSIZ;
+               }
+
+               lzma_ret ret = lzma_code(&xz->strm, action);
+               if (ret != LZMA_OK && ret != LZMA_STREAM_END) {
+                       // Some kind of error.
+                       return 0;
+               }
+
+               if (xz->strm.avail_in > 0) {
+                       // Definitely not done yet. Flush the buffer and encode
+                       // some more.
+                       continue;
+               }
+
+               // Whether we are already done or not depends on the mode.
+               if (action == LZMA_RUN) {
+                       assert(ret == LZMA_OK);
+                       // More input will follow so we can leave the output 
buffer
+                       // for later.
+                       return 1;
+               }
+
+               // We need to flush all data out of the encoder and out of our
+               // buffer.
+               if (ret == LZMA_OK) {
+                       // encoder requests another iteration
+                       continue;
+               }
+
+               // That was it.  Flush the remainder of the buffer and exit.
+               size_t amount = xz->strm.next_out - xz->buf;
+               if (amount > 0) {
+                       size_t nwritten = fwrite(xz->buf, 1, amount, xz->fp);
+                       if (nwritten != amount) {
+                               return 0;
+                       }
+               }
+               xz->strm.next_out = xz->buf;
+               xz->strm.avail_out = XZBUFSIZ;
+               return 1;
+       }
+}
+
+
 static ssize_t
 stream_xzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t 
cnt)
 {
@@ -94,7 +163,9 @@ stream_xzwrite(stream *restrict s, const
 {
        xz_stream *xz = s->stream_data.p;
        size_t size = elmsize * cnt;
-       lzma_action action = LZMA_RUN;
+
+       if (size == 0)
+               return cnt;
 
        if (xz == NULL) {
                s->errnr = MNSTR_WRITE_ERROR;
@@ -103,50 +174,29 @@ stream_xzwrite(stream *restrict s, const
 
        xz->strm.next_in = buf;
        xz->strm.avail_in = size;
-       xz->strm.next_out = xz->buf;
-       xz->strm.avail_out = XZBUFSIZ;
 
-       size = 0;
-       while (xz->strm.avail_in) {
-               size_t sz = 0, isz = xz->strm.avail_in;
-
-               lzma_ret ret = lzma_code(&xz->strm, action);
-               if (xz->strm.avail_out == 0 || ret != LZMA_OK) {
-                       s->errnr = MNSTR_WRITE_ERROR;
-                       return -1;
-               }
-               sz = XZBUFSIZ - xz->strm.avail_out;
-               if (fwrite(xz->buf, 1, sz, xz->fp) != sz) {
-                       s->errnr = MNSTR_WRITE_ERROR;
-                       return -1;
-               }
-               assert(xz->strm.avail_in == 0);
-               size += isz;
-               xz->strm.next_out = xz->buf;
-               xz->strm.avail_out = XZBUFSIZ;
+       if (pump_out(xz, LZMA_RUN))
+               return (ssize_t) (size / elmsize);
+       else {
+               s->errnr = MNSTR_WRITE_ERROR;
+               return -1;
        }
-       if (size)
-               return (ssize_t) (size / elmsize);
-       return (ssize_t) cnt;
 }
 
 static void
 stream_xzclose(stream *s)
 {
        xz_stream *xz = s->stream_data.p;
-       xz->strm.next_out = xz->buf;
-       xz->strm.avail_out = XZBUFSIZ;
 
        if (xz) {
                if (!s->readonly) {
-                       lzma_ret ret = lzma_code(&xz->strm, LZMA_FINISH);
-
-                       if (xz->strm.avail_out && ret == LZMA_STREAM_END) {
-                               size_t sz = XZBUFSIZ - xz->strm.avail_out;
-                               if (fwrite(xz->buf, 1, sz, xz->fp) != sz)
-                                       s->errnr = MNSTR_WRITE_ERROR;
+                       xz->strm.next_in = NULL;
+                       xz->strm.avail_in = 0;
+                       if (pump_out(xz, LZMA_FINISH)) {
+                               fflush(xz->fp);
+                       } else {
+                               s->errnr = MNSTR_WRITE_ERROR;
                        }
-                       fflush(xz->fp);
                }
                fclose(xz->fp);
                lzma_end(&xz->strm);
@@ -160,10 +210,19 @@ stream_xzflush(stream *s)
 {
        xz_stream *xz = s->stream_data.p;
 
+       if (s->readonly)
+               return 0;
        if (xz == NULL)
                return -1;
-       if (!s->readonly && fflush(xz->fp))
-               return -1;
+
+       xz->strm.next_in = NULL;
+       xz->strm.avail_in = 0;
+       if (pump_out(xz, LZMA_FULL_BARRIER)) {
+               fflush(xz->fp);
+       } else {
+               s->errnr = MNSTR_WRITE_ERROR;
+       }
+
        return 0;
 }
 
@@ -224,6 +283,8 @@ open_xzstream(const char *restrict filen
        s->close = stream_xzclose;
        s->flush = stream_xzflush;
        s->stream_data.p = (void *) xz;
+       xz->strm.next_out = xz->buf;
+       xz->strm.avail_out = XZBUFSIZ;
        if (flags[0] == 'r' && flags[1] != 'b') {
                char buf[UTF8BOMLENGTH];
                if (stream_xzread(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH &&
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to