Changeset: c98fbdc33cf5 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c98fbdc33cf5 Modified Files: common/stream/xz_stream.c Branch: makelibstreamgreatagain Log Message:
Rewrite xz_stream.c to be less wrong The existing code did not take into account that multiple invocations of lzma_code() might be necessary to process all input. diffs (183 lines): diff --git a/common/stream/xz_stream.c b/common/stream/xz_stream.c --- a/common/stream/xz_stream.c +++ b/common/stream/xz_stream.c @@ -22,6 +22,75 @@ typedef struct xz_stream { uint8_t buf[XZBUFSIZ]; } xz_stream; +/* Keep calling lzma_code until the whole input buffer has been consumed + * and all necessary output has been written. + * + * If action is LZMA_RUN, iteration ends when lzma_code() returns LZMA_OK + * while the output buffer is not empty. The output buffers is not flushed + * because we expect further invocations. + * + * If action is something else, for example LZMA_FINISH, iteration ends + * when lzma_code() returns LZMA_STREAM_END and the out buffer is flushed + * afterward. + * + * Returns > 0 on succes, 0 on error. + */ +static int +pump_out(xz_stream *xz, lzma_action action) +{ + while (1) { + // Make sure there is room in the output buffer + if (xz->strm.avail_out == 0) { + size_t nwritten = fwrite(xz->buf, 1, XZBUFSIZ, xz->fp); + if (nwritten != XZBUFSIZ) { + return 0; + } + xz->strm.next_out = xz->buf; + xz->strm.avail_out = XZBUFSIZ; + } + + lzma_ret ret = lzma_code(&xz->strm, action); + if (ret != LZMA_OK && ret != LZMA_STREAM_END) { + // Some kind of error. + return 0; + } + + if (xz->strm.avail_in > 0) { + // Definitely not done yet. Flush the buffer and encode + // some more. + continue; + } + + // Whether we are already done or not depends on the mode. + if (action == LZMA_RUN) { + assert(ret == LZMA_OK); + // More input will follow so we can leave the output buffer + // for later. + return 1; + } + + // We need to flush all data out of the encoder and out of our + // buffer. + if (ret == LZMA_OK) { + // encoder requests another iteration + continue; + } + + // That was it. Flush the remainder of the buffer and exit. + size_t amount = xz->strm.next_out - xz->buf; + if (amount > 0) { + size_t nwritten = fwrite(xz->buf, 1, amount, xz->fp); + if (nwritten != amount) { + return 0; + } + } + xz->strm.next_out = xz->buf; + xz->strm.avail_out = XZBUFSIZ; + return 1; + } +} + + static ssize_t stream_xzread(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) { @@ -94,7 +163,9 @@ stream_xzwrite(stream *restrict s, const { xz_stream *xz = s->stream_data.p; size_t size = elmsize * cnt; - lzma_action action = LZMA_RUN; + + if (size == 0) + return cnt; if (xz == NULL) { s->errnr = MNSTR_WRITE_ERROR; @@ -103,50 +174,29 @@ stream_xzwrite(stream *restrict s, const xz->strm.next_in = buf; xz->strm.avail_in = size; - xz->strm.next_out = xz->buf; - xz->strm.avail_out = XZBUFSIZ; - size = 0; - while (xz->strm.avail_in) { - size_t sz = 0, isz = xz->strm.avail_in; - - lzma_ret ret = lzma_code(&xz->strm, action); - if (xz->strm.avail_out == 0 || ret != LZMA_OK) { - s->errnr = MNSTR_WRITE_ERROR; - return -1; - } - sz = XZBUFSIZ - xz->strm.avail_out; - if (fwrite(xz->buf, 1, sz, xz->fp) != sz) { - s->errnr = MNSTR_WRITE_ERROR; - return -1; - } - assert(xz->strm.avail_in == 0); - size += isz; - xz->strm.next_out = xz->buf; - xz->strm.avail_out = XZBUFSIZ; + if (pump_out(xz, LZMA_RUN)) + return (ssize_t) (size / elmsize); + else { + s->errnr = MNSTR_WRITE_ERROR; + return -1; } - if (size) - return (ssize_t) (size / elmsize); - return (ssize_t) cnt; } static void stream_xzclose(stream *s) { xz_stream *xz = s->stream_data.p; - xz->strm.next_out = xz->buf; - xz->strm.avail_out = XZBUFSIZ; if (xz) { if (!s->readonly) { - lzma_ret ret = lzma_code(&xz->strm, LZMA_FINISH); - - if (xz->strm.avail_out && ret == LZMA_STREAM_END) { - size_t sz = XZBUFSIZ - xz->strm.avail_out; - if (fwrite(xz->buf, 1, sz, xz->fp) != sz) - s->errnr = MNSTR_WRITE_ERROR; + xz->strm.next_in = NULL; + xz->strm.avail_in = 0; + if (pump_out(xz, LZMA_FINISH)) { + fflush(xz->fp); + } else { + s->errnr = MNSTR_WRITE_ERROR; } - fflush(xz->fp); } fclose(xz->fp); lzma_end(&xz->strm); @@ -160,10 +210,19 @@ stream_xzflush(stream *s) { xz_stream *xz = s->stream_data.p; + if (s->readonly) + return 0; if (xz == NULL) return -1; - if (!s->readonly && fflush(xz->fp)) - return -1; + + xz->strm.next_in = NULL; + xz->strm.avail_in = 0; + if (pump_out(xz, LZMA_FULL_BARRIER)) { + fflush(xz->fp); + } else { + s->errnr = MNSTR_WRITE_ERROR; + } + return 0; } @@ -224,6 +283,8 @@ open_xzstream(const char *restrict filen s->close = stream_xzclose; s->flush = stream_xzflush; s->stream_data.p = (void *) xz; + xz->strm.next_out = xz->buf; + xz->strm.avail_out = XZBUFSIZ; if (flags[0] == 'r' && flags[1] != 'b') { char buf[UTF8BOMLENGTH]; if (stream_xzread(s, buf, 1, UTF8BOMLENGTH) == UTF8BOMLENGTH && _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list